]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Merge Coverity Fixes
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 47 Store Directory Routines */
10
11 #include "squid.h"
12 #include "cache_cf.h"
13 #include "CollapsedForwarding.h"
14 #include "ConfigOption.h"
15 #include "DiskIO/DiskIOModule.h"
16 #include "DiskIO/DiskIOStrategy.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fs/rock/RockIoRequests.h"
20 #include "fs/rock/RockIoState.h"
21 #include "fs/rock/RockRebuild.h"
22 #include "fs/rock/RockSwapDir.h"
23 #include "globals.h"
24 #include "ipc/mem/Pages.h"
25 #include "MemObject.h"
26 #include "Parsing.h"
27 #include "SquidConfig.h"
28 #include "SquidMath.h"
29 #include "tools.h"
30
31 #include <cstdlib>
32 #include <iomanip>
33 #include <limits>
34
35 #if HAVE_SYS_STAT_H
36 #include <sys/stat.h>
37 #endif
38
39 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
40
41 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
42 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
43 waitingForPage(NULL)
44 {
45 }
46
47 Rock::SwapDir::~SwapDir()
48 {
49 delete io;
50 delete map;
51 safe_free(filePath);
52 }
53
54 StoreSearch *
55 Rock::SwapDir::search(String const, HttpRequest *)
56 {
57 assert(false);
58 return NULL; // XXX: implement
59 }
60
61 void
62 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
63 {
64 ::SwapDir::get(key, cb, data);
65 }
66
67 // called when Squid core needs a StoreEntry with a given key
68 StoreEntry *
69 Rock::SwapDir::get(const cache_key *key)
70 {
71 if (!map || !theFile || !theFile->canRead())
72 return NULL;
73
74 sfileno filen;
75 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
76 if (!slot)
77 return NULL;
78
79 // create a brand new store entry and initialize it with stored basics
80 StoreEntry *e = new StoreEntry();
81 anchorEntry(*e, filen, *slot);
82
83 e->hashInsert(key);
84 trackReferences(*e);
85
86 return e;
87 // the disk entry remains open for reading, protected from modifications
88 }
89
90 bool
91 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
92 {
93 if (!map || !theFile || !theFile->canRead())
94 return false;
95
96 sfileno filen;
97 const Ipc::StoreMapAnchor *const slot = map->openForReading(
98 reinterpret_cast<cache_key*>(collapsed.key), filen);
99 if (!slot)
100 return false;
101
102 anchorEntry(collapsed, filen, *slot);
103 inSync = updateCollapsedWith(collapsed, *slot);
104 return true; // even if inSync is false
105 }
106
107 bool
108 Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
109 {
110 if (!map || !theFile || !theFile->canRead())
111 return false;
112
113 if (collapsed.swap_filen < 0) // no longer using a disk cache
114 return true;
115 assert(collapsed.swap_dirn == index);
116
117 const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
118 return updateCollapsedWith(collapsed, s);
119 }
120
121 bool
122 Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
123 {
124 collapsed.swap_file_sz = anchor.basics.swap_file_sz;
125 return true;
126 }
127
128 void
129 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
130 {
131 const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
132
133 e.swap_file_sz = basics.swap_file_sz;
134 e.lastref = basics.lastref;
135 e.timestamp = basics.timestamp;
136 e.expires = basics.expires;
137 e.lastmod = basics.lastmod;
138 e.refcount = basics.refcount;
139 e.flags = basics.flags;
140
141 if (anchor.complete()) {
142 e.store_status = STORE_OK;
143 e.swap_status = SWAPOUT_DONE;
144 } else {
145 e.store_status = STORE_PENDING;
146 e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
147 }
148
149 e.ping_status = PING_NONE;
150
151 EBIT_CLR(e.flags, RELEASE_REQUEST);
152 EBIT_CLR(e.flags, KEY_PRIVATE);
153 EBIT_SET(e.flags, ENTRY_VALIDATED);
154
155 e.swap_dirn = index;
156 e.swap_filen = filen;
157 }
158
159 void Rock::SwapDir::disconnect(StoreEntry &e)
160 {
161 assert(e.swap_dirn == index);
162 assert(e.swap_filen >= 0);
163 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
164 assert(e.swap_status != SWAPOUT_NONE);
165
166 // do not rely on e.swap_status here because there is an async delay
167 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
168
169 // since e has swap_filen, its slot is locked for reading and/or writing
170 // but it is difficult to know whether THIS worker is reading or writing e,
171 // especially since we may switch from writing to reading. This code relies
172 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
173 if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
174 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
175 map->abortWriting(e.swap_filen);
176 e.swap_dirn = -1;
177 e.swap_filen = -1;
178 e.swap_status = SWAPOUT_NONE;
179 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
180 Store::Root().transientsAbandon(e); // broadcasts after the change
181 } else {
182 map->closeForReading(e.swap_filen);
183 e.swap_dirn = -1;
184 e.swap_filen = -1;
185 e.swap_status = SWAPOUT_NONE;
186 }
187 }
188
189 uint64_t
190 Rock::SwapDir::currentSize() const
191 {
192 const uint64_t spaceSize = !freeSlots ?
193 maxSize() : (slotSize * freeSlots->size());
194 // everything that is not free is in use
195 return maxSize() - spaceSize;
196 }
197
198 uint64_t
199 Rock::SwapDir::currentCount() const
200 {
201 return map ? map->entryCount() : 0;
202 }
203
204 /// In SMP mode only the disker process reports stats to avoid
205 /// counting the same stats by multiple processes.
206 bool
207 Rock::SwapDir::doReportStat() const
208 {
209 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
210 }
211
212 void
213 Rock::SwapDir::swappedOut(const StoreEntry &)
214 {
215 // stats are not stored but computed when needed
216 }
217
218 int64_t
219 Rock::SwapDir::slotLimitAbsolute() const
220 {
221 // the max value is an invalid one; all values must be below the limit
222 assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
223 std::numeric_limits<SlotId>::max());
224 return std::numeric_limits<SlotId>::max();
225 }
226
227 int64_t
228 Rock::SwapDir::slotLimitActual() const
229 {
230 const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
231 const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported
232 const int64_t sLimitHi = slotLimitAbsolute();
233 return min(max(sLimitLo, sWanted), sLimitHi);
234 }
235
236 int64_t
237 Rock::SwapDir::entryLimitActual() const
238 {
239 return min(slotLimitActual(), entryLimitAbsolute());
240 }
241
242 // TODO: encapsulate as a tool
243 void
244 Rock::SwapDir::create()
245 {
246 assert(path);
247 assert(filePath);
248
249 if (UsingSmp() && !IamDiskProcess()) {
250 debugs (47,3, HERE << "disker will create in " << path);
251 return;
252 }
253
254 debugs (47,3, HERE << "creating in " << path);
255
256 struct stat dir_sb;
257 if (::stat(path, &dir_sb) == 0) {
258 struct stat file_sb;
259 if (::stat(filePath, &file_sb) == 0) {
260 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
261 return;
262 }
263 // else the db file is not there or is not accessible, and we will try
264 // to create it later below, generating a detailed error on failures.
265 } else { // path does not exist or is inaccessible
266 // If path exists but is not accessible, mkdir() below will fail, and
267 // the admin should see the error and act accordingly, so there is
268 // no need to distinguish ENOENT from other possible stat() errors.
269 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
270 const int res = mkdir(path, 0700);
271 if (res != 0)
272 createError("mkdir");
273 }
274
275 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
276 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
277 if (swap < 0)
278 createError("create");
279
280 #if SLOWLY_FILL_WITH_ZEROS
281 char block[1024];
282 Must(maxSize() % sizeof(block) == 0);
283 memset(block, '\0', sizeof(block));
284
285 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
286 if (write(swap, block, sizeof(block)) != sizeof(block))
287 createError("write");
288 }
289 #else
290 if (ftruncate(swap, maxSize()) != 0)
291 createError("truncate");
292
293 char header[HeaderSize];
294 memset(header, '\0', sizeof(header));
295 if (write(swap, header, sizeof(header)) != sizeof(header))
296 createError("write");
297 #endif
298
299 close(swap);
300 }
301
302 // report Rock DB creation error and exit
303 void
304 Rock::SwapDir::createError(const char *const msg)
305 {
306 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
307 filePath << "; " << msg << " error: " << xstrerror());
308 fatal("Rock Store db creation error");
309 }
310
311 void
312 Rock::SwapDir::init()
313 {
314 debugs(47,2, HERE);
315
316 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
317 // are refcounted. We up our count once to avoid implicit delete's.
318 lock();
319
320 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
321
322 Must(!map);
323 map = new DirMap(inodeMapPath());
324 map->cleaner = this;
325
326 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
327 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
328 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
329 io = m->createStrategy();
330 io->init();
331 } else {
332 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
333 ioModule);
334 fatal("Rock Store missing a required DiskIO module");
335 }
336
337 theFile = io->newFile(filePath);
338 theFile->configure(fileConfig);
339 theFile->open(O_RDWR, 0644, this);
340
341 // Increment early. Otherwise, if one SwapDir finishes rebuild before
342 // others start, storeRebuildComplete() will think the rebuild is over!
343 // TODO: move store_dirs_rebuilding hack to store modules that need it.
344 ++StoreController::store_dirs_rebuilding;
345 }
346
347 bool
348 Rock::SwapDir::needsDiskStrand() const
349 {
350 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
351 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
352 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
353 wouldWorkBetterWithDisker);
354 }
355
356 void
357 Rock::SwapDir::parse(int anIndex, char *aPath)
358 {
359 index = anIndex;
360
361 path = xstrdup(aPath);
362
363 // cache store is located at path/db
364 String fname(path);
365 fname.append("/rock");
366 filePath = xstrdup(fname.termedBuf());
367
368 parseSize(false);
369 parseOptions(0);
370
371 // Current openForWriting() code overwrites the old slot if needed
372 // and possible, so proactively removing old slots is probably useless.
373 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
374
375 validateOptions();
376 }
377
378 void
379 Rock::SwapDir::reconfigure()
380 {
381 parseSize(true);
382 parseOptions(1);
383 // TODO: can we reconfigure the replacement policy (repl)?
384 validateOptions();
385 }
386
387 /// parse maximum db disk size
388 void
389 Rock::SwapDir::parseSize(const bool reconfig)
390 {
391 const int i = GetInteger();
392 if (i < 0)
393 fatal("negative Rock cache_dir size value");
394 const uint64_t new_max_size =
395 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
396 if (!reconfig)
397 max_size = new_max_size;
398 else if (new_max_size != max_size) {
399 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
400 "cannot be changed dynamically, value left unchanged (" <<
401 (max_size >> 20) << " MB)");
402 }
403 }
404
405 ConfigOption *
406 Rock::SwapDir::getOptionTree() const
407 {
408 ConfigOption *copt = ::SwapDir::getOptionTree();
409 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(copt);
410 if (vector) {
411 // if copt is actually a ConfigOptionVector
412 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
413 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
414 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
415 } else {
416 // we don't know how to handle copt, as it's not a ConfigOptionVector.
417 // free it (and return nullptr)
418 delete copt;
419 }
420 return vector;
421 }
422
423 bool
424 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
425 {
426 return strcmp(option, "slot-size") != 0 &&
427 ::SwapDir::allowOptionReconfigure(option);
428 }
429
430 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
431 bool
432 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
433 {
434 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
435 // including time unit handling. Same for size and rate.
436
437 time_msec_t *storedTime;
438 if (strcmp(option, "swap-timeout") == 0)
439 storedTime = &fileConfig.ioTimeout;
440 else
441 return false;
442
443 if (!value)
444 self_destruct();
445
446 // TODO: handle time units and detect parsing errors better
447 const int64_t parsedValue = strtoll(value, NULL, 10);
448 if (parsedValue < 0) {
449 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
450 self_destruct();
451 }
452
453 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
454
455 if (!reconfig)
456 *storedTime = newTime;
457 else if (*storedTime != newTime) {
458 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
459 << " cannot be changed dynamically, value left unchanged: " <<
460 *storedTime);
461 }
462
463 return true;
464 }
465
466 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
467 void
468 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
469 {
470 if (fileConfig.ioTimeout)
471 storeAppendPrintf(e, " swap-timeout=%" PRId64,
472 static_cast<int64_t>(fileConfig.ioTimeout));
473 }
474
475 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
476 bool
477 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
478 {
479 int *storedRate;
480 if (strcmp(option, "max-swap-rate") == 0)
481 storedRate = &fileConfig.ioRate;
482 else
483 return false;
484
485 if (!value)
486 self_destruct();
487
488 // TODO: handle time units and detect parsing errors better
489 const int64_t parsedValue = strtoll(value, NULL, 10);
490 if (parsedValue < 0) {
491 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
492 self_destruct();
493 }
494
495 const int newRate = static_cast<int>(parsedValue);
496
497 if (newRate < 0) {
498 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
499 self_destruct();
500 }
501
502 if (!isaReconfig)
503 *storedRate = newRate;
504 else if (*storedRate != newRate) {
505 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
506 << " cannot be changed dynamically, value left unchanged: " <<
507 *storedRate);
508 }
509
510 return true;
511 }
512
513 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
514 void
515 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
516 {
517 if (fileConfig.ioRate >= 0)
518 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
519 }
520
521 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
522 bool
523 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
524 {
525 uint64_t *storedSize;
526 if (strcmp(option, "slot-size") == 0)
527 storedSize = &slotSize;
528 else
529 return false;
530
531 if (!value)
532 self_destruct();
533
534 // TODO: handle size units and detect parsing errors better
535 const uint64_t newSize = strtoll(value, NULL, 10);
536 if (newSize <= 0) {
537 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
538 self_destruct();
539 }
540
541 if (newSize <= sizeof(DbCellHeader)) {
542 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
543 self_destruct();
544 }
545
546 if (!reconfig)
547 *storedSize = newSize;
548 else if (*storedSize != newSize) {
549 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
550 << " cannot be changed dynamically, value left unchanged: " <<
551 *storedSize);
552 }
553
554 return true;
555 }
556
557 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
558 void
559 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
560 {
561 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
562 }
563
564 /// check the results of the configuration; only level-0 debugging works here
565 void
566 Rock::SwapDir::validateOptions()
567 {
568 if (slotSize <= 0)
569 fatal("Rock store requires a positive slot-size");
570
571 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
572 const int64_t slotSizeRoundingWaste = slotSize;
573 const int64_t maxRoundingWaste =
574 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
575
576 // an entry consumes at least one slot; round up to reduce false warnings
577 const int64_t blockSize = static_cast<int64_t>(slotSize);
578 const int64_t maxObjSize = max(blockSize,
579 ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
580
581 // Does the "sfileno*max-size" limit match configured db capacity?
582 const double entriesMayOccupy = entryLimitAbsolute()*static_cast<double>(maxObjSize);
583 if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
584 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
585 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" <<
586 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
587 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
588 "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
589 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
590 "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" <<
591 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
592 }
593
594 // Does the "absolute slot count" limit match configured db capacity?
595 const double slotsMayOccupy = slotLimitAbsolute()*static_cast<double>(slotSize);
596 if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
597 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
598 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" <<
599 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
600 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
601 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
602 "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" <<
603 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
604 }
605 }
606
607 void
608 Rock::SwapDir::rebuild()
609 {
610 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
611 AsyncJob::Start(new Rebuild(this));
612 }
613
614 bool
615 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
616 {
617 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
618 return false;
619
620 if (!theFile || !theFile->canWrite())
621 return false;
622
623 if (!map)
624 return false;
625
626 // Do not start I/O transaction if there are less than 10% free pages left.
627 // TODO: reserve page instead
628 if (needsDiskStrand() &&
629 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
630 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
631 return false;
632 }
633
634 if (io->shedLoad())
635 return false;
636
637 load = io->load();
638 return true;
639 }
640
641 StoreIOState::Pointer
642 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
643 {
644 if (!theFile || theFile->error()) {
645 debugs(47,4, HERE << theFile);
646 return NULL;
647 }
648
649 sfileno filen;
650 Ipc::StoreMapAnchor *const slot =
651 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
652 if (!slot) {
653 debugs(47, 5, HERE << "map->add failed");
654 return NULL;
655 }
656
657 assert(filen >= 0);
658 slot->set(e);
659
660 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
661 // If that does not happen, the entry will not decrement the read level!
662
663 Rock::SwapDir::Pointer self(this);
664 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
665
666 sio->swap_dirn = index;
667 sio->swap_filen = filen;
668 sio->writeableAnchor_ = slot;
669
670 debugs(47,5, HERE << "dir " << index << " created new filen " <<
671 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
672 sio->swap_filen << std::dec << " starting at " <<
673 diskOffset(sio->swap_filen));
674
675 sio->file(theFile);
676
677 trackReferences(e);
678 return sio;
679 }
680
681 int64_t
682 Rock::SwapDir::diskOffset(const SlotId sid) const
683 {
684 assert(sid >= 0);
685 return HeaderSize + slotSize*sid;
686 }
687
688 int64_t
689 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
690 {
691 assert(pageId);
692 return diskOffset(pageId.number - 1);
693 }
694
695 int64_t
696 Rock::SwapDir::diskOffsetLimit() const
697 {
698 assert(map);
699 return diskOffset(map->sliceLimit());
700 }
701
702 bool
703 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
704 {
705 if (freeSlots->pop(pageId)) {
706 debugs(47, 5, "got a previously free slot: " << pageId);
707 return true;
708 }
709
710 // catch free slots delivered to noteFreeMapSlice()
711 assert(!waitingForPage);
712 waitingForPage = &pageId;
713 if (map->purgeOne()) {
714 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
715 assert(pageId.set());
716 debugs(47, 5, "got a previously busy slot: " << pageId);
717 return true;
718 }
719 assert(waitingForPage == &pageId);
720 waitingForPage = NULL;
721
722 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
723 return false;
724 }
725
726 bool
727 Rock::SwapDir::validSlotId(const SlotId slotId) const
728 {
729 return 0 <= slotId && slotId < slotLimitActual();
730 }
731
732 void
733 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
734 {
735 Ipc::Mem::PageId pageId;
736 pageId.pool = index+1;
737 pageId.number = sliceId+1;
738 if (waitingForPage) {
739 *waitingForPage = pageId;
740 waitingForPage = NULL;
741 } else {
742 freeSlots->push(pageId);
743 }
744 }
745
746 // tries to open an old entry with swap_filen for reading
747 StoreIOState::Pointer
748 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
749 {
750 if (!theFile || theFile->error()) {
751 debugs(47,4, HERE << theFile);
752 return NULL;
753 }
754
755 if (e.swap_filen < 0) {
756 debugs(47,4, HERE << e);
757 return NULL;
758 }
759
760 // Do not start I/O transaction if there are less than 10% free pages left.
761 // TODO: reserve page instead
762 if (needsDiskStrand() &&
763 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
764 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
765 return NULL;
766 }
767
768 // The are two ways an entry can get swap_filen: our get() locked it for
769 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
770 // locked entry is safe, but no support for reading the entry we swap out.
771 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
772 if (!slot)
773 return NULL; // we were writing afterall
774
775 Rock::SwapDir::Pointer self(this);
776 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
777
778 sio->swap_dirn = index;
779 sio->swap_filen = e.swap_filen;
780 sio->readableAnchor_ = slot;
781 sio->file(theFile);
782
783 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
784 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
785 sio->swap_filen);
786
787 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
788 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
789 // may still be zero and basics.swap_file_sz may grow.
790 assert(slot->basics.swap_file_sz >= e.swap_file_sz);
791
792 return sio;
793 }
794
795 void
796 Rock::SwapDir::ioCompletedNotification()
797 {
798 if (!theFile)
799 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
800
801 if (theFile->error())
802 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
803 xstrerror());
804
805 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
806 std::setw(12) << maxSize() << " disk bytes, " <<
807 std::setw(7) << map->entryLimit() << " entries, and " <<
808 std::setw(7) << map->sliceLimit() << " slots");
809
810 rebuild();
811 }
812
813 void
814 Rock::SwapDir::closeCompleted()
815 {
816 theFile = NULL;
817 }
818
819 void
820 Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::ReadRequest> r)
821 {
822 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
823 assert(request);
824 IoState::Pointer sio = request->sio;
825
826 if (errflag == DISK_OK && rlen > 0)
827 sio->offset_ += rlen;
828
829 sio->callReaderBack(r->buf, rlen);
830 }
831
832 void
833 Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
834 {
835 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
836 assert(request);
837 assert(request->sio != NULL);
838 IoState &sio = *request->sio;
839
840 // quit if somebody called IoState::close() while we were waiting
841 if (!sio.stillWaiting()) {
842 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
843 noteFreeMapSlice(request->sidNext);
844 return;
845 }
846
847 // TODO: Fail if disk dropped one of the previous write requests.
848
849 if (errflag == DISK_OK) {
850 // do not increment sio.offset_ because we do it in sio->write()
851
852 // finalize the shared slice info after writing slice contents to disk
853 Ipc::StoreMap::Slice &slice =
854 map->writeableSlice(sio.swap_filen, request->sidCurrent);
855 slice.size = request->len - sizeof(DbCellHeader);
856 slice.next = request->sidNext;
857
858 if (request->eof) {
859 assert(sio.e);
860 assert(sio.writeableAnchor_);
861 sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
862 sio.offset_;
863
864 // close, the entry gets the read lock
865 map->closeForWriting(sio.swap_filen, true);
866 sio.writeableAnchor_ = NULL;
867 sio.finishedWriting(errflag);
868 }
869 } else {
870 noteFreeMapSlice(request->sidNext);
871
872 writeError(*sio.e);
873 sio.finishedWriting(errflag);
874 // and hope that Core will call disconnect() to close the map entry
875 }
876
877 CollapsedForwarding::Broadcast(*sio.e);
878 }
879
880 void
881 Rock::SwapDir::writeError(StoreEntry &e)
882 {
883 // Do not abortWriting here. The entry should keep the write lock
884 // instead of losing association with the store and confusing core.
885 map->freeEntry(e.swap_filen); // will mark as unusable, just in case
886
887 Store::Root().transientsAbandon(e);
888
889 // All callers must also call IoState callback, to propagate the error.
890 }
891
892 bool
893 Rock::SwapDir::full() const
894 {
895 return freeSlots != NULL && !freeSlots->size();
896 }
897
898 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
899 // but it should not happen for us
900 void
901 Rock::SwapDir::diskFull()
902 {
903 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
904 filePath);
905 }
906
907 /// purge while full(); it should be sufficient to purge just one
908 void
909 Rock::SwapDir::maintain()
910 {
911 // The Store calls this to free some db space, but there is nothing wrong
912 // with a full() db, except when db has to shrink after reconfigure, and
913 // we do not support shrinking yet (it would have to purge specific slots).
914 // TODO: Disable maintain() requests when they are pointless.
915 }
916
917 void
918 Rock::SwapDir::reference(StoreEntry &e)
919 {
920 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
921 if (repl && repl->Referenced)
922 repl->Referenced(repl, &e, &e.repl);
923 }
924
925 bool
926 Rock::SwapDir::dereference(StoreEntry &e, bool)
927 {
928 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
929 if (repl && repl->Dereferenced)
930 repl->Dereferenced(repl, &e, &e.repl);
931
932 // no need to keep e in the global store_table for us; we have our own map
933 return false;
934 }
935
936 bool
937 Rock::SwapDir::unlinkdUseful() const
938 {
939 // no entry-specific files to unlink
940 return false;
941 }
942
943 void
944 Rock::SwapDir::unlink(StoreEntry &e)
945 {
946 debugs(47, 5, HERE << e);
947 ignoreReferences(e);
948 map->freeEntry(e.swap_filen);
949 disconnect(e);
950 }
951
952 void
953 Rock::SwapDir::markForUnlink(StoreEntry &e)
954 {
955 debugs(47, 5, e);
956 map->freeEntry(e.swap_filen);
957 }
958
959 void
960 Rock::SwapDir::trackReferences(StoreEntry &e)
961 {
962 debugs(47, 5, HERE << e);
963 if (repl)
964 repl->Add(repl, &e, &e.repl);
965 }
966
967 void
968 Rock::SwapDir::ignoreReferences(StoreEntry &e)
969 {
970 debugs(47, 5, HERE << e);
971 if (repl)
972 repl->Remove(repl, &e, &e.repl);
973 }
974
975 void
976 Rock::SwapDir::statfs(StoreEntry &e) const
977 {
978 storeAppendPrintf(&e, "\n");
979 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
980 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
981 currentSize() / 1024.0,
982 Math::doublePercent(currentSize(), maxSize()));
983
984 const int entryLimit = entryLimitActual();
985 const int slotLimit = slotLimitActual();
986 storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
987 if (map && entryLimit > 0) {
988 const int entryCount = map->entryCount();
989 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
990 entryCount, (100.0 * entryCount / entryLimit));
991 }
992
993 storeAppendPrintf(&e, "Maximum slots: %9d\n", slotLimit);
994 if (map && slotLimit > 0) {
995 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
996 if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
997 const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
998 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
999 usedSlots, (100.0 * usedSlots / slotLimit));
1000 }
1001 if (slotLimit < 100) { // XXX: otherwise too expensive to count
1002 Ipc::ReadWriteLockStats stats;
1003 map->updateStats(stats);
1004 stats.dump(e);
1005 }
1006 }
1007
1008 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
1009 store_open_disk_fd, Config.max_open_disk_fds);
1010
1011 storeAppendPrintf(&e, "Flags:");
1012
1013 if (flags.selected)
1014 storeAppendPrintf(&e, " SELECTED");
1015
1016 if (flags.read_only)
1017 storeAppendPrintf(&e, " READ-ONLY");
1018
1019 storeAppendPrintf(&e, "\n");
1020
1021 }
1022
1023 SBuf
1024 Rock::SwapDir::inodeMapPath() const
1025 {
1026 return Ipc::Mem::Segment::Name(SBuf(path), "map");
1027 }
1028
1029 const char *
1030 Rock::SwapDir::freeSlotsPath() const
1031 {
1032 static String spacesPath;
1033 spacesPath = path;
1034 spacesPath.append("_spaces");
1035 return spacesPath.termedBuf();
1036 }
1037
1038 namespace Rock
1039 {
1040 RunnerRegistrationEntry(SwapDirRr);
1041 }
1042
1043 void Rock::SwapDirRr::create()
1044 {
1045 Must(mapOwners.empty() && freeSlotsOwners.empty());
1046 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1047 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1048 const int64_t capacity = sd->slotLimitActual();
1049
1050 SwapDir::DirMap::Owner *const mapOwner =
1051 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1052 mapOwners.push_back(mapOwner);
1053
1054 // TODO: somehow remove pool id and counters from PageStack?
1055 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1056 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1057 i+1, capacity, 0);
1058 freeSlotsOwners.push_back(freeSlotsOwner);
1059
1060 // TODO: add method to initialize PageStack with no free pages
1061 while (true) {
1062 Ipc::Mem::PageId pageId;
1063 if (!freeSlotsOwner->object()->pop(pageId))
1064 break;
1065 }
1066 }
1067 }
1068 }
1069
1070 Rock::SwapDirRr::~SwapDirRr()
1071 {
1072 for (size_t i = 0; i < mapOwners.size(); ++i) {
1073 delete mapOwners[i];
1074 delete freeSlotsOwners[i];
1075 }
1076 }
1077