]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
65004291aaf251d5fd383640a796b86f12de258a
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 47 Store Directory Routines */
10
11 #include "squid.h"
12 #include "cache_cf.h"
13 #include "CollapsedForwarding.h"
14 #include "ConfigOption.h"
15 #include "DiskIO/DiskIOModule.h"
16 #include "DiskIO/DiskIOStrategy.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fs/rock/RockIoRequests.h"
20 #include "fs/rock/RockIoState.h"
21 #include "fs/rock/RockRebuild.h"
22 #include "fs/rock/RockSwapDir.h"
23 #include "globals.h"
24 #include "ipc/mem/Pages.h"
25 #include "MemObject.h"
26 #include "Parsing.h"
27 #include "SquidConfig.h"
28 #include "SquidMath.h"
29 #include "tools.h"
30
31 #include <cstdlib>
32 #include <iomanip>
33 #include <limits>
34
35 #if HAVE_SYS_STAT_H
36 #include <sys/stat.h>
37 #endif
38
39 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
40
41 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
42 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
43 waitingForPage(NULL)
44 {
45 }
46
47 Rock::SwapDir::~SwapDir()
48 {
49 delete io;
50 delete map;
51 safe_free(filePath);
52 }
53
54 // called when Squid core needs a StoreEntry with a given key
55 StoreEntry *
56 Rock::SwapDir::get(const cache_key *key)
57 {
58 if (!map || !theFile || !theFile->canRead())
59 return NULL;
60
61 sfileno filen;
62 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
63 if (!slot)
64 return NULL;
65
66 // create a brand new store entry and initialize it with stored basics
67 StoreEntry *e = new StoreEntry();
68 anchorEntry(*e, filen, *slot);
69
70 e->hashInsert(key);
71 trackReferences(*e);
72
73 return e;
74 // the disk entry remains open for reading, protected from modifications
75 }
76
77 bool
78 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
79 {
80 if (!map || !theFile || !theFile->canRead())
81 return false;
82
83 sfileno filen;
84 const Ipc::StoreMapAnchor *const slot = map->openForReading(
85 reinterpret_cast<cache_key*>(collapsed.key), filen);
86 if (!slot)
87 return false;
88
89 anchorEntry(collapsed, filen, *slot);
90 inSync = updateCollapsedWith(collapsed, *slot);
91 return true; // even if inSync is false
92 }
93
94 bool
95 Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
96 {
97 if (!map || !theFile || !theFile->canRead())
98 return false;
99
100 if (collapsed.swap_filen < 0) // no longer using a disk cache
101 return true;
102 assert(collapsed.swap_dirn == index);
103
104 const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
105 return updateCollapsedWith(collapsed, s);
106 }
107
108 bool
109 Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
110 {
111 collapsed.swap_file_sz = anchor.basics.swap_file_sz;
112 return true;
113 }
114
115 void
116 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
117 {
118 const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
119
120 e.swap_file_sz = basics.swap_file_sz;
121 e.lastref = basics.lastref;
122 e.timestamp = basics.timestamp;
123 e.expires = basics.expires;
124 e.lastmod = basics.lastmod;
125 e.refcount = basics.refcount;
126 e.flags = basics.flags;
127
128 if (anchor.complete()) {
129 e.store_status = STORE_OK;
130 e.swap_status = SWAPOUT_DONE;
131 } else {
132 e.store_status = STORE_PENDING;
133 e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
134 }
135
136 e.ping_status = PING_NONE;
137
138 EBIT_CLR(e.flags, RELEASE_REQUEST);
139 EBIT_CLR(e.flags, KEY_PRIVATE);
140 EBIT_SET(e.flags, ENTRY_VALIDATED);
141
142 e.swap_dirn = index;
143 e.swap_filen = filen;
144 }
145
146 void Rock::SwapDir::disconnect(StoreEntry &e)
147 {
148 assert(e.swap_dirn == index);
149 assert(e.swap_filen >= 0);
150 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
151 assert(e.swap_status != SWAPOUT_NONE);
152
153 // do not rely on e.swap_status here because there is an async delay
154 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
155
156 // since e has swap_filen, its slot is locked for reading and/or writing
157 // but it is difficult to know whether THIS worker is reading or writing e,
158 // especially since we may switch from writing to reading. This code relies
159 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
160 if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
161 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
162 map->abortWriting(e.swap_filen);
163 e.swap_dirn = -1;
164 e.swap_filen = -1;
165 e.swap_status = SWAPOUT_NONE;
166 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
167 Store::Root().transientsAbandon(e); // broadcasts after the change
168 } else {
169 map->closeForReading(e.swap_filen);
170 e.swap_dirn = -1;
171 e.swap_filen = -1;
172 e.swap_status = SWAPOUT_NONE;
173 }
174 }
175
176 uint64_t
177 Rock::SwapDir::currentSize() const
178 {
179 const uint64_t spaceSize = !freeSlots ?
180 maxSize() : (slotSize * freeSlots->size());
181 // everything that is not free is in use
182 return maxSize() - spaceSize;
183 }
184
185 uint64_t
186 Rock::SwapDir::currentCount() const
187 {
188 return map ? map->entryCount() : 0;
189 }
190
191 /// In SMP mode only the disker process reports stats to avoid
192 /// counting the same stats by multiple processes.
193 bool
194 Rock::SwapDir::doReportStat() const
195 {
196 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
197 }
198
199 void
200 Rock::SwapDir::swappedOut(const StoreEntry &)
201 {
202 // stats are not stored but computed when needed
203 }
204
205 int64_t
206 Rock::SwapDir::slotLimitAbsolute() const
207 {
208 // the max value is an invalid one; all values must be below the limit
209 assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
210 std::numeric_limits<SlotId>::max());
211 return std::numeric_limits<SlotId>::max();
212 }
213
214 int64_t
215 Rock::SwapDir::slotLimitActual() const
216 {
217 const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
218 const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported
219 const int64_t sLimitHi = slotLimitAbsolute();
220 return min(max(sLimitLo, sWanted), sLimitHi);
221 }
222
223 int64_t
224 Rock::SwapDir::entryLimitActual() const
225 {
226 return min(slotLimitActual(), entryLimitAbsolute());
227 }
228
229 // TODO: encapsulate as a tool
230 void
231 Rock::SwapDir::create()
232 {
233 assert(path);
234 assert(filePath);
235
236 if (UsingSmp() && !IamDiskProcess()) {
237 debugs (47,3, HERE << "disker will create in " << path);
238 return;
239 }
240
241 debugs (47,3, HERE << "creating in " << path);
242
243 struct stat dir_sb;
244 if (::stat(path, &dir_sb) == 0) {
245 struct stat file_sb;
246 if (::stat(filePath, &file_sb) == 0) {
247 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
248 return;
249 }
250 // else the db file is not there or is not accessible, and we will try
251 // to create it later below, generating a detailed error on failures.
252 } else { // path does not exist or is inaccessible
253 // If path exists but is not accessible, mkdir() below will fail, and
254 // the admin should see the error and act accordingly, so there is
255 // no need to distinguish ENOENT from other possible stat() errors.
256 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
257 const int res = mkdir(path, 0700);
258 if (res != 0)
259 createError("mkdir");
260 }
261
262 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
263 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
264 if (swap < 0)
265 createError("create");
266
267 #if SLOWLY_FILL_WITH_ZEROS
268 char block[1024];
269 Must(maxSize() % sizeof(block) == 0);
270 memset(block, '\0', sizeof(block));
271
272 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
273 if (write(swap, block, sizeof(block)) != sizeof(block))
274 createError("write");
275 }
276 #else
277 if (ftruncate(swap, maxSize()) != 0)
278 createError("truncate");
279
280 char header[HeaderSize];
281 memset(header, '\0', sizeof(header));
282 if (write(swap, header, sizeof(header)) != sizeof(header))
283 createError("write");
284 #endif
285
286 close(swap);
287 }
288
289 // report Rock DB creation error and exit
290 void
291 Rock::SwapDir::createError(const char *const msg)
292 {
293 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
294 filePath << "; " << msg << " error: " << xstrerror());
295 fatal("Rock Store db creation error");
296 }
297
298 void
299 Rock::SwapDir::init()
300 {
301 debugs(47,2, HERE);
302
303 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
304 // are refcounted. We up our count once to avoid implicit delete's.
305 lock();
306
307 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
308
309 Must(!map);
310 map = new DirMap(inodeMapPath());
311 map->cleaner = this;
312
313 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
314 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
315 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
316 io = m->createStrategy();
317 io->init();
318 } else {
319 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
320 ioModule);
321 fatal("Rock Store missing a required DiskIO module");
322 }
323
324 theFile = io->newFile(filePath);
325 theFile->configure(fileConfig);
326 theFile->open(O_RDWR, 0644, this);
327
328 // Increment early. Otherwise, if one SwapDir finishes rebuild before
329 // others start, storeRebuildComplete() will think the rebuild is over!
330 // TODO: move store_dirs_rebuilding hack to store modules that need it.
331 ++StoreController::store_dirs_rebuilding;
332 }
333
334 bool
335 Rock::SwapDir::needsDiskStrand() const
336 {
337 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
338 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
339 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
340 wouldWorkBetterWithDisker);
341 }
342
343 void
344 Rock::SwapDir::parse(int anIndex, char *aPath)
345 {
346 index = anIndex;
347
348 path = xstrdup(aPath);
349
350 // cache store is located at path/db
351 String fname(path);
352 fname.append("/rock");
353 filePath = xstrdup(fname.termedBuf());
354
355 parseSize(false);
356 parseOptions(0);
357
358 // Current openForWriting() code overwrites the old slot if needed
359 // and possible, so proactively removing old slots is probably useless.
360 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
361
362 validateOptions();
363 }
364
365 void
366 Rock::SwapDir::reconfigure()
367 {
368 parseSize(true);
369 parseOptions(1);
370 // TODO: can we reconfigure the replacement policy (repl)?
371 validateOptions();
372 }
373
374 /// parse maximum db disk size
375 void
376 Rock::SwapDir::parseSize(const bool reconfig)
377 {
378 const int i = GetInteger();
379 if (i < 0)
380 fatal("negative Rock cache_dir size value");
381 const uint64_t new_max_size =
382 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
383 if (!reconfig)
384 max_size = new_max_size;
385 else if (new_max_size != max_size) {
386 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
387 "cannot be changed dynamically, value left unchanged (" <<
388 (max_size >> 20) << " MB)");
389 }
390 }
391
392 ConfigOption *
393 Rock::SwapDir::getOptionTree() const
394 {
395 ConfigOption *copt = ::SwapDir::getOptionTree();
396 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(copt);
397 if (vector) {
398 // if copt is actually a ConfigOptionVector
399 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
400 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
401 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
402 } else {
403 // we don't know how to handle copt, as it's not a ConfigOptionVector.
404 // free it (and return nullptr)
405 delete copt;
406 copt = nullptr;
407 }
408 return copt;
409 }
410
411 bool
412 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
413 {
414 return strcmp(option, "slot-size") != 0 &&
415 ::SwapDir::allowOptionReconfigure(option);
416 }
417
418 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
419 bool
420 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
421 {
422 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
423 // including time unit handling. Same for size and rate.
424
425 time_msec_t *storedTime;
426 if (strcmp(option, "swap-timeout") == 0)
427 storedTime = &fileConfig.ioTimeout;
428 else
429 return false;
430
431 if (!value)
432 self_destruct();
433
434 // TODO: handle time units and detect parsing errors better
435 const int64_t parsedValue = strtoll(value, NULL, 10);
436 if (parsedValue < 0) {
437 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
438 self_destruct();
439 }
440
441 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
442
443 if (!reconfig)
444 *storedTime = newTime;
445 else if (*storedTime != newTime) {
446 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
447 << " cannot be changed dynamically, value left unchanged: " <<
448 *storedTime);
449 }
450
451 return true;
452 }
453
454 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
455 void
456 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
457 {
458 if (fileConfig.ioTimeout)
459 storeAppendPrintf(e, " swap-timeout=%" PRId64,
460 static_cast<int64_t>(fileConfig.ioTimeout));
461 }
462
463 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
464 bool
465 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
466 {
467 int *storedRate;
468 if (strcmp(option, "max-swap-rate") == 0)
469 storedRate = &fileConfig.ioRate;
470 else
471 return false;
472
473 if (!value)
474 self_destruct();
475
476 // TODO: handle time units and detect parsing errors better
477 const int64_t parsedValue = strtoll(value, NULL, 10);
478 if (parsedValue < 0) {
479 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
480 self_destruct();
481 }
482
483 const int newRate = static_cast<int>(parsedValue);
484
485 if (newRate < 0) {
486 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
487 self_destruct();
488 }
489
490 if (!isaReconfig)
491 *storedRate = newRate;
492 else if (*storedRate != newRate) {
493 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
494 << " cannot be changed dynamically, value left unchanged: " <<
495 *storedRate);
496 }
497
498 return true;
499 }
500
501 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
502 void
503 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
504 {
505 if (fileConfig.ioRate >= 0)
506 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
507 }
508
509 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
510 bool
511 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
512 {
513 uint64_t *storedSize;
514 if (strcmp(option, "slot-size") == 0)
515 storedSize = &slotSize;
516 else
517 return false;
518
519 if (!value)
520 self_destruct();
521
522 // TODO: handle size units and detect parsing errors better
523 const uint64_t newSize = strtoll(value, NULL, 10);
524 if (newSize <= 0) {
525 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
526 self_destruct();
527 }
528
529 if (newSize <= sizeof(DbCellHeader)) {
530 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
531 self_destruct();
532 }
533
534 if (!reconfig)
535 *storedSize = newSize;
536 else if (*storedSize != newSize) {
537 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
538 << " cannot be changed dynamically, value left unchanged: " <<
539 *storedSize);
540 }
541
542 return true;
543 }
544
545 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
546 void
547 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
548 {
549 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
550 }
551
552 /// check the results of the configuration; only level-0 debugging works here
553 void
554 Rock::SwapDir::validateOptions()
555 {
556 if (slotSize <= 0)
557 fatal("Rock store requires a positive slot-size");
558
559 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
560 const int64_t slotSizeRoundingWaste = slotSize;
561 const int64_t maxRoundingWaste =
562 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
563
564 // an entry consumes at least one slot; round up to reduce false warnings
565 const int64_t blockSize = static_cast<int64_t>(slotSize);
566 const int64_t maxObjSize = max(blockSize,
567 ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
568
569 // Does the "sfileno*max-size" limit match configured db capacity?
570 const double entriesMayOccupy = entryLimitAbsolute()*static_cast<double>(maxObjSize);
571 if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
572 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
573 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" <<
574 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
575 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
576 "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
577 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
578 "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" <<
579 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
580 }
581
582 // Does the "absolute slot count" limit match configured db capacity?
583 const double slotsMayOccupy = slotLimitAbsolute()*static_cast<double>(slotSize);
584 if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
585 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
586 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" <<
587 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
588 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
589 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
590 "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" <<
591 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
592 }
593 }
594
595 void
596 Rock::SwapDir::rebuild()
597 {
598 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
599 AsyncJob::Start(new Rebuild(this));
600 }
601
602 bool
603 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
604 {
605 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
606 return false;
607
608 if (!theFile || !theFile->canWrite())
609 return false;
610
611 if (!map)
612 return false;
613
614 // Do not start I/O transaction if there are less than 10% free pages left.
615 // TODO: reserve page instead
616 if (needsDiskStrand() &&
617 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
618 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
619 return false;
620 }
621
622 if (io->shedLoad())
623 return false;
624
625 load = io->load();
626 return true;
627 }
628
629 StoreIOState::Pointer
630 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
631 {
632 if (!theFile || theFile->error()) {
633 debugs(47,4, HERE << theFile);
634 return NULL;
635 }
636
637 sfileno filen;
638 Ipc::StoreMapAnchor *const slot =
639 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
640 if (!slot) {
641 debugs(47, 5, HERE << "map->add failed");
642 return NULL;
643 }
644
645 assert(filen >= 0);
646 slot->set(e);
647
648 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
649 // If that does not happen, the entry will not decrement the read level!
650
651 Rock::SwapDir::Pointer self(this);
652 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
653
654 sio->swap_dirn = index;
655 sio->swap_filen = filen;
656 sio->writeableAnchor_ = slot;
657
658 debugs(47,5, HERE << "dir " << index << " created new filen " <<
659 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
660 sio->swap_filen << std::dec << " starting at " <<
661 diskOffset(sio->swap_filen));
662
663 sio->file(theFile);
664
665 trackReferences(e);
666 return sio;
667 }
668
669 int64_t
670 Rock::SwapDir::diskOffset(const SlotId sid) const
671 {
672 assert(sid >= 0);
673 return HeaderSize + slotSize*sid;
674 }
675
676 int64_t
677 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
678 {
679 assert(pageId);
680 return diskOffset(pageId.number - 1);
681 }
682
683 int64_t
684 Rock::SwapDir::diskOffsetLimit() const
685 {
686 assert(map);
687 return diskOffset(map->sliceLimit());
688 }
689
690 bool
691 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
692 {
693 if (freeSlots->pop(pageId)) {
694 debugs(47, 5, "got a previously free slot: " << pageId);
695 return true;
696 }
697
698 // catch free slots delivered to noteFreeMapSlice()
699 assert(!waitingForPage);
700 waitingForPage = &pageId;
701 if (map->purgeOne()) {
702 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
703 assert(pageId.set());
704 debugs(47, 5, "got a previously busy slot: " << pageId);
705 return true;
706 }
707 assert(waitingForPage == &pageId);
708 waitingForPage = NULL;
709
710 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
711 return false;
712 }
713
714 bool
715 Rock::SwapDir::validSlotId(const SlotId slotId) const
716 {
717 return 0 <= slotId && slotId < slotLimitActual();
718 }
719
720 void
721 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
722 {
723 Ipc::Mem::PageId pageId;
724 pageId.pool = index+1;
725 pageId.number = sliceId+1;
726 if (waitingForPage) {
727 *waitingForPage = pageId;
728 waitingForPage = NULL;
729 } else {
730 freeSlots->push(pageId);
731 }
732 }
733
734 // tries to open an old entry with swap_filen for reading
735 StoreIOState::Pointer
736 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
737 {
738 if (!theFile || theFile->error()) {
739 debugs(47,4, HERE << theFile);
740 return NULL;
741 }
742
743 if (e.swap_filen < 0) {
744 debugs(47,4, HERE << e);
745 return NULL;
746 }
747
748 // Do not start I/O transaction if there are less than 10% free pages left.
749 // TODO: reserve page instead
750 if (needsDiskStrand() &&
751 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
752 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
753 return NULL;
754 }
755
756 // The are two ways an entry can get swap_filen: our get() locked it for
757 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
758 // locked entry is safe, but no support for reading the entry we swap out.
759 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
760 if (!slot)
761 return NULL; // we were writing afterall
762
763 Rock::SwapDir::Pointer self(this);
764 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
765
766 sio->swap_dirn = index;
767 sio->swap_filen = e.swap_filen;
768 sio->readableAnchor_ = slot;
769 sio->file(theFile);
770
771 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
772 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
773 sio->swap_filen);
774
775 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
776 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
777 // may still be zero and basics.swap_file_sz may grow.
778 assert(slot->basics.swap_file_sz >= e.swap_file_sz);
779
780 return sio;
781 }
782
783 void
784 Rock::SwapDir::ioCompletedNotification()
785 {
786 if (!theFile)
787 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
788
789 if (theFile->error())
790 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
791 xstrerror());
792
793 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
794 std::setw(12) << maxSize() << " disk bytes, " <<
795 std::setw(7) << map->entryLimit() << " entries, and " <<
796 std::setw(7) << map->sliceLimit() << " slots");
797
798 rebuild();
799 }
800
801 void
802 Rock::SwapDir::closeCompleted()
803 {
804 theFile = NULL;
805 }
806
807 void
808 Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::ReadRequest> r)
809 {
810 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
811 assert(request);
812 IoState::Pointer sio = request->sio;
813
814 if (errflag == DISK_OK && rlen > 0)
815 sio->offset_ += rlen;
816
817 sio->callReaderBack(r->buf, rlen);
818 }
819
820 void
821 Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
822 {
823 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
824 assert(request);
825 assert(request->sio != NULL);
826 IoState &sio = *request->sio;
827
828 // quit if somebody called IoState::close() while we were waiting
829 if (!sio.stillWaiting()) {
830 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
831 noteFreeMapSlice(request->sidNext);
832 return;
833 }
834
835 // TODO: Fail if disk dropped one of the previous write requests.
836
837 if (errflag == DISK_OK) {
838 // do not increment sio.offset_ because we do it in sio->write()
839
840 // finalize the shared slice info after writing slice contents to disk
841 Ipc::StoreMap::Slice &slice =
842 map->writeableSlice(sio.swap_filen, request->sidCurrent);
843 slice.size = request->len - sizeof(DbCellHeader);
844 slice.next = request->sidNext;
845
846 if (request->eof) {
847 assert(sio.e);
848 assert(sio.writeableAnchor_);
849 sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
850 sio.offset_;
851
852 // close, the entry gets the read lock
853 map->closeForWriting(sio.swap_filen, true);
854 sio.writeableAnchor_ = NULL;
855 sio.finishedWriting(errflag);
856 }
857 } else {
858 noteFreeMapSlice(request->sidNext);
859
860 writeError(*sio.e);
861 sio.finishedWriting(errflag);
862 // and hope that Core will call disconnect() to close the map entry
863 }
864
865 CollapsedForwarding::Broadcast(*sio.e);
866 }
867
868 void
869 Rock::SwapDir::writeError(StoreEntry &e)
870 {
871 // Do not abortWriting here. The entry should keep the write lock
872 // instead of losing association with the store and confusing core.
873 map->freeEntry(e.swap_filen); // will mark as unusable, just in case
874
875 Store::Root().transientsAbandon(e);
876
877 // All callers must also call IoState callback, to propagate the error.
878 }
879
880 bool
881 Rock::SwapDir::full() const
882 {
883 return freeSlots != NULL && !freeSlots->size();
884 }
885
886 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
887 // but it should not happen for us
888 void
889 Rock::SwapDir::diskFull()
890 {
891 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
892 filePath);
893 }
894
895 /// purge while full(); it should be sufficient to purge just one
896 void
897 Rock::SwapDir::maintain()
898 {
899 // The Store calls this to free some db space, but there is nothing wrong
900 // with a full() db, except when db has to shrink after reconfigure, and
901 // we do not support shrinking yet (it would have to purge specific slots).
902 // TODO: Disable maintain() requests when they are pointless.
903 }
904
905 void
906 Rock::SwapDir::reference(StoreEntry &e)
907 {
908 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
909 if (repl && repl->Referenced)
910 repl->Referenced(repl, &e, &e.repl);
911 }
912
913 bool
914 Rock::SwapDir::dereference(StoreEntry &e)
915 {
916 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
917 if (repl && repl->Dereferenced)
918 repl->Dereferenced(repl, &e, &e.repl);
919
920 // no need to keep e in the global store_table for us; we have our own map
921 return false;
922 }
923
924 bool
925 Rock::SwapDir::unlinkdUseful() const
926 {
927 // no entry-specific files to unlink
928 return false;
929 }
930
931 void
932 Rock::SwapDir::unlink(StoreEntry &e)
933 {
934 debugs(47, 5, HERE << e);
935 ignoreReferences(e);
936 map->freeEntry(e.swap_filen);
937 disconnect(e);
938 }
939
940 void
941 Rock::SwapDir::markForUnlink(StoreEntry &e)
942 {
943 debugs(47, 5, e);
944 map->freeEntry(e.swap_filen);
945 }
946
947 void
948 Rock::SwapDir::trackReferences(StoreEntry &e)
949 {
950 debugs(47, 5, HERE << e);
951 if (repl)
952 repl->Add(repl, &e, &e.repl);
953 }
954
955 void
956 Rock::SwapDir::ignoreReferences(StoreEntry &e)
957 {
958 debugs(47, 5, HERE << e);
959 if (repl)
960 repl->Remove(repl, &e, &e.repl);
961 }
962
963 void
964 Rock::SwapDir::statfs(StoreEntry &e) const
965 {
966 storeAppendPrintf(&e, "\n");
967 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
968 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
969 currentSize() / 1024.0,
970 Math::doublePercent(currentSize(), maxSize()));
971
972 const int entryLimit = entryLimitActual();
973 const int slotLimit = slotLimitActual();
974 storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
975 if (map && entryLimit > 0) {
976 const int entryCount = map->entryCount();
977 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
978 entryCount, (100.0 * entryCount / entryLimit));
979 }
980
981 storeAppendPrintf(&e, "Maximum slots: %9d\n", slotLimit);
982 if (map && slotLimit > 0) {
983 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
984 if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
985 const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
986 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
987 usedSlots, (100.0 * usedSlots / slotLimit));
988 }
989 if (slotLimit < 100) { // XXX: otherwise too expensive to count
990 Ipc::ReadWriteLockStats stats;
991 map->updateStats(stats);
992 stats.dump(e);
993 }
994 }
995
996 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
997 store_open_disk_fd, Config.max_open_disk_fds);
998
999 storeAppendPrintf(&e, "Flags:");
1000
1001 if (flags.selected)
1002 storeAppendPrintf(&e, " SELECTED");
1003
1004 if (flags.read_only)
1005 storeAppendPrintf(&e, " READ-ONLY");
1006
1007 storeAppendPrintf(&e, "\n");
1008
1009 }
1010
1011 SBuf
1012 Rock::SwapDir::inodeMapPath() const
1013 {
1014 return Ipc::Mem::Segment::Name(SBuf(path), "map");
1015 }
1016
1017 const char *
1018 Rock::SwapDir::freeSlotsPath() const
1019 {
1020 static String spacesPath;
1021 spacesPath = path;
1022 spacesPath.append("_spaces");
1023 return spacesPath.termedBuf();
1024 }
1025
1026 namespace Rock
1027 {
1028 RunnerRegistrationEntry(SwapDirRr);
1029 }
1030
1031 void Rock::SwapDirRr::create()
1032 {
1033 Must(mapOwners.empty() && freeSlotsOwners.empty());
1034 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1035 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1036 const int64_t capacity = sd->slotLimitActual();
1037
1038 SwapDir::DirMap::Owner *const mapOwner =
1039 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1040 mapOwners.push_back(mapOwner);
1041
1042 // TODO: somehow remove pool id and counters from PageStack?
1043 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1044 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1045 i+1, capacity, 0);
1046 freeSlotsOwners.push_back(freeSlotsOwner);
1047
1048 // TODO: add method to initialize PageStack with no free pages
1049 while (true) {
1050 Ipc::Mem::PageId pageId;
1051 if (!freeSlotsOwner->object()->pop(pageId))
1052 break;
1053 }
1054 }
1055 }
1056 }
1057
1058 Rock::SwapDirRr::~SwapDirRr()
1059 {
1060 for (size_t i = 0; i < mapOwners.size(); ++i) {
1061 delete mapOwners[i];
1062 delete freeSlotsOwners[i];
1063 }
1064 }
1065