]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Use newly added slot-size cache_dir parameter instead of abusing max-size.
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "ConfigOption.h"
8 #include "DiskIO/DiskIOModule.h"
9 #include "DiskIO/DiskIOStrategy.h"
10 #include "DiskIO/ReadRequest.h"
11 #include "DiskIO/WriteRequest.h"
12 #include "fs/rock/RockSwapDir.h"
13 #include "fs/rock/RockIoState.h"
14 #include "fs/rock/RockIoRequests.h"
15 #include "fs/rock/RockRebuild.h"
16 #include "globals.h"
17 #include "ipc/mem/Pages.h"
18 #include "MemObject.h"
19 #include "Parsing.h"
20 #include "SquidConfig.h"
21 #include "SquidMath.h"
22 #include "tools.h"
23
24 #include <cstdlib>
25 #include <iomanip>
26
27 #if HAVE_SYS_STAT_H
28 #include <sys/stat.h>
29 #endif
30
31 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
32
33 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
34 slotSize(HeaderSize), filePath(NULL), io(NULL), map(NULL), dbSlots(NULL)
35 {
36 }
37
38 Rock::SwapDir::~SwapDir()
39 {
40 delete io;
41 delete map;
42 safe_free(filePath);
43 }
44
45 StoreSearch *
46 Rock::SwapDir::search(String const url, HttpRequest *)
47 {
48 assert(false);
49 return NULL; // XXX: implement
50 }
51
52 void
53 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
54 {
55 ::SwapDir::get(key, cb, data);
56 }
57
58 // called when Squid core needs a StoreEntry with a given key
59 StoreEntry *
60 Rock::SwapDir::get(const cache_key *key)
61 {
62 if (!map || !theFile || !theFile->canRead())
63 return NULL;
64
65 sfileno filen;
66 const Ipc::StoreMapSlot *const slot = map->openForReading(key, filen);
67 if (!slot)
68 return NULL;
69
70 const Ipc::StoreMapSlot::Basics &basics = slot->basics;
71
72 // create a brand new store entry and initialize it with stored basics
73 StoreEntry *e = new StoreEntry();
74 e->lock_count = 0;
75 e->swap_dirn = index;
76 e->swap_filen = filen;
77 e->swap_file_sz = basics.swap_file_sz;
78 e->lastref = basics.lastref;
79 e->timestamp = basics.timestamp;
80 e->expires = basics.expires;
81 e->lastmod = basics.lastmod;
82 e->refcount = basics.refcount;
83 e->flags = basics.flags;
84 e->store_status = STORE_OK;
85 e->setMemStatus(NOT_IN_MEMORY);
86 e->swap_status = SWAPOUT_DONE;
87 e->ping_status = PING_NONE;
88 EBIT_SET(e->flags, ENTRY_CACHABLE);
89 EBIT_CLR(e->flags, RELEASE_REQUEST);
90 EBIT_CLR(e->flags, KEY_PRIVATE);
91 EBIT_SET(e->flags, ENTRY_VALIDATED);
92 e->hashInsert(key);
93 trackReferences(*e);
94
95 return e;
96 // the disk entry remains open for reading, protected from modifications
97 }
98
99 void Rock::SwapDir::disconnect(StoreEntry &e)
100 {
101 assert(e.swap_dirn == index);
102 assert(e.swap_filen >= 0);
103 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
104 assert(e.swap_status != SWAPOUT_NONE);
105
106 // do not rely on e.swap_status here because there is an async delay
107 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
108
109 // since e has swap_filen, its slot is locked for either reading or writing
110 map->abortIo(e.swap_filen);
111 e.swap_dirn = -1;
112 e.swap_filen = -1;
113 e.swap_status = SWAPOUT_NONE;
114 }
115
116 uint64_t
117 Rock::SwapDir::currentSize() const
118 {
119 const uint64_t spaceSize = !dbSlotIndex ?
120 maxSize() : (slotSize * dbSlotIndex->size());
121 // everything that is not free is in use
122 return maxSize() - spaceSize;
123 }
124
125 uint64_t
126 Rock::SwapDir::currentCount() const
127 {
128 return map ? map->entryCount() : 0;
129 }
130
131 /// In SMP mode only the disker process reports stats to avoid
132 /// counting the same stats by multiple processes.
133 bool
134 Rock::SwapDir::doReportStat() const
135 {
136 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
137 }
138
139 void
140 Rock::SwapDir::swappedOut(const StoreEntry &)
141 {
142 // stats are not stored but computed when needed
143 }
144
145 int64_t
146 Rock::SwapDir::entryLimitAllowed() const
147 {
148 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
149 const int64_t eWanted = (maxSize() - HeaderSize)/slotSize;
150 return min(max(eLimitLo, eWanted), entryLimitHigh());
151 }
152
153 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
154 void
155 Rock::SwapDir::create()
156 {
157 assert(path);
158 assert(filePath);
159
160 if (UsingSmp() && !IamDiskProcess()) {
161 debugs (47,3, HERE << "disker will create in " << path);
162 return;
163 }
164
165 debugs (47,3, HERE << "creating in " << path);
166
167 struct stat swap_sb;
168 if (::stat(path, &swap_sb) < 0) {
169 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
170 const int res = mkdir(path, 0700);
171 if (res != 0)
172 createError("mkdir");
173 }
174
175 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
176 if (swap < 0)
177 createError("create");
178
179 #if SLOWLY_FILL_WITH_ZEROS
180 char block[1024];
181 Must(maxSize() % sizeof(block) == 0);
182 memset(block, '\0', sizeof(block));
183
184 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
185 if (write(swap, block, sizeof(block)) != sizeof(block))
186 createError("write");
187 }
188 #else
189 if (ftruncate(swap, maxSize()) != 0)
190 createError("truncate");
191
192 char header[HeaderSize];
193 memset(header, '\0', sizeof(header));
194 if (write(swap, header, sizeof(header)) != sizeof(header))
195 createError("write");
196 #endif
197
198 close(swap);
199 }
200
201 // report Rock DB creation error and exit
202 void
203 Rock::SwapDir::createError(const char *const msg) {
204 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
205 filePath << "; " << msg << " error: " << xstrerror());
206 fatal("Rock Store db creation error");
207 }
208
209 void
210 Rock::SwapDir::init()
211 {
212 debugs(47,2, HERE);
213
214 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
215 // are refcounted. We up our count once to avoid implicit delete's.
216 lock();
217
218 Must(!map);
219 map = new DirMap(path);
220
221 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
222 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
223 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
224 io = m->createStrategy();
225 io->init();
226 } else {
227 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
228 ioModule);
229 fatal("Rock Store missing a required DiskIO module");
230 }
231
232 theFile = io->newFile(filePath);
233 theFile->configure(fileConfig);
234 theFile->open(O_RDWR, 0644, this);
235
236 dbSlotIndex = shm_old(Ipc::Mem::PageStack)(path);
237 dbSlots = new (reinterpret_cast<char *>(dbSlotIndex.getRaw()) +
238 dbSlotIndex->stackSize()) DbCellHeader[entryLimitAllowed()];
239
240 // Increment early. Otherwise, if one SwapDir finishes rebuild before
241 // others start, storeRebuildComplete() will think the rebuild is over!
242 // TODO: move store_dirs_rebuilding hack to store modules that need it.
243 ++StoreController::store_dirs_rebuilding;
244 }
245
246 bool
247 Rock::SwapDir::needsDiskStrand() const
248 {
249 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
250 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
251 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
252 wouldWorkBetterWithDisker);
253 }
254
255 void
256 Rock::SwapDir::parse(int anIndex, char *aPath)
257 {
258 index = anIndex;
259
260 path = xstrdup(aPath);
261
262 // cache store is located at path/db
263 String fname(path);
264 fname.append("/rock");
265 filePath = xstrdup(fname.termedBuf());
266
267 parseSize(false);
268 parseOptions(0);
269
270 // Current openForWriting() code overwrites the old slot if needed
271 // and possible, so proactively removing old slots is probably useless.
272 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
273
274 validateOptions();
275 }
276
277 void
278 Rock::SwapDir::reconfigure()
279 {
280 parseSize(true);
281 parseOptions(1);
282 // TODO: can we reconfigure the replacement policy (repl)?
283 validateOptions();
284 }
285
286 /// parse maximum db disk size
287 void
288 Rock::SwapDir::parseSize(const bool reconfiguring)
289 {
290 const int i = GetInteger();
291 if (i < 0)
292 fatal("negative Rock cache_dir size value");
293 const uint64_t new_max_size =
294 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
295 if (!reconfiguring)
296 max_size = new_max_size;
297 else if (new_max_size != max_size) {
298 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
299 "cannot be changed dynamically, value left unchanged (" <<
300 (max_size >> 20) << " MB)");
301 }
302 }
303
304 ConfigOption *
305 Rock::SwapDir::getOptionTree() const
306 {
307 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
308 assert(vector);
309 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
310 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
311 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
312 return vector;
313 }
314
315 bool
316 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
317 {
318 return strcmp(option, "slot-size") != 0 &&
319 ::SwapDir::allowOptionReconfigure(option);
320 }
321
322 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
323 bool
324 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfiguring)
325 {
326 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
327 // including time unit handling. Same for size and rate.
328
329 time_msec_t *storedTime;
330 if (strcmp(option, "swap-timeout") == 0)
331 storedTime = &fileConfig.ioTimeout;
332 else
333 return false;
334
335 if (!value)
336 self_destruct();
337
338 // TODO: handle time units and detect parsing errors better
339 const int64_t parsedValue = strtoll(value, NULL, 10);
340 if (parsedValue < 0) {
341 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
342 self_destruct();
343 }
344
345 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
346
347 if (!reconfiguring)
348 *storedTime = newTime;
349 else if (*storedTime != newTime) {
350 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
351 << " cannot be changed dynamically, value left unchanged: " <<
352 *storedTime);
353 }
354
355 return true;
356 }
357
358 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
359 void
360 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
361 {
362 if (fileConfig.ioTimeout)
363 storeAppendPrintf(e, " swap-timeout=%" PRId64,
364 static_cast<int64_t>(fileConfig.ioTimeout));
365 }
366
367 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
368 bool
369 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
370 {
371 int *storedRate;
372 if (strcmp(option, "max-swap-rate") == 0)
373 storedRate = &fileConfig.ioRate;
374 else
375 return false;
376
377 if (!value)
378 self_destruct();
379
380 // TODO: handle time units and detect parsing errors better
381 const int64_t parsedValue = strtoll(value, NULL, 10);
382 if (parsedValue < 0) {
383 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
384 self_destruct();
385 }
386
387 const int newRate = static_cast<int>(parsedValue);
388
389 if (newRate < 0) {
390 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
391 self_destruct();
392 }
393
394 if (!isaReconfig)
395 *storedRate = newRate;
396 else if (*storedRate != newRate) {
397 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
398 << " cannot be changed dynamically, value left unchanged: " <<
399 *storedRate);
400 }
401
402 return true;
403 }
404
405 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
406 void
407 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
408 {
409 if (fileConfig.ioRate >= 0)
410 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
411 }
412
413 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
414 bool
415 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfiguring)
416 {
417 uint64_t *storedSize;
418 if (strcmp(option, "slot-size") == 0)
419 storedSize = &slotSize;
420 else
421 return false;
422
423 if (!value)
424 self_destruct();
425
426 // TODO: handle size units and detect parsing errors better
427 const uint64_t newSize = strtoll(value, NULL, 10);
428 if (newSize <= 0) {
429 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
430 self_destruct();
431 }
432
433 if (newSize <= sizeof(DbCellHeader)) {
434 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
435 self_destruct();
436 }
437
438 if (!reconfiguring)
439 *storedSize = newSize;
440 else if (*storedSize != newSize) {
441 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
442 << " cannot be changed dynamically, value left unchanged: " <<
443 *storedSize);
444 }
445
446 return true;
447 }
448
449 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
450 void
451 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
452 {
453 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
454 }
455
456 /// check the results of the configuration; only level-0 debugging works here
457 void
458 Rock::SwapDir::validateOptions()
459 {
460 if (slotSize <= 0)
461 fatal("Rock store requires a positive slot-size");
462
463 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
464 const int64_t slotSizeRoundingWaste = slotSize;
465 const int64_t maxRoundingWaste =
466 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
467 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
468 const int64_t diskWasteSize = maxSize() - usableDiskSize;
469 Must(diskWasteSize >= 0);
470
471 // warn if maximum db size is not reachable due to sfileno limit
472 if (entryLimitAllowed() == entryLimitHigh() &&
473 diskWasteSize >= maxRoundingWaste) {
474 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
475 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
476 debugs(47, DBG_CRITICAL, "\tdb slot size: " << slotSize << " Bytes");
477 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
478 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
479 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
480 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
481 }
482 }
483
484 void
485 Rock::SwapDir::rebuild()
486 {
487 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
488 AsyncJob::Start(new Rebuild(this));
489 }
490
491 bool
492 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
493 {
494 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
495 return false;
496
497 if (!theFile || !theFile->canWrite())
498 return false;
499
500 if (!map)
501 return false;
502
503 // TODO: consider DB slots freed when older object would be replaced
504 if (dbSlotIndex->size() <
505 static_cast<unsigned int>(max(entriesNeeded(diskSpaceNeeded), 1)))
506 return false;
507
508 // Do not start I/O transaction if there are less than 10% free pages left.
509 // TODO: reserve page instead
510 if (needsDiskStrand() &&
511 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
512 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
513 return false;
514 }
515
516 if (io->shedLoad())
517 return false;
518
519 load = io->load();
520 return true;
521 }
522
523 StoreIOState::Pointer
524 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
525 {
526 if (!theFile || theFile->error()) {
527 debugs(47,4, HERE << theFile);
528 return NULL;
529 }
530
531 sfileno filen;
532 Ipc::StoreMapSlot *const slot =
533 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
534 if (!slot) {
535 debugs(47, 5, HERE << "map->add failed");
536 return NULL;
537 }
538
539 Ipc::Mem::PageId pageId;
540 if (!popDbSlot(pageId)) {
541 debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << filePath <<
542 "' run out of DB slots");
543 map->free(filen);
544 }
545
546 slot->set(e);
547
548 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
549 // If that does not happen, the entry will not decrement the read level!
550
551 IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
552
553 sio->swap_dirn = index;
554 sio->swap_filen = filen;
555 sio->diskOffset = diskOffset(pageId);
556
557 DbCellHeader &firstDbSlot = dbSlot(pageId);
558 memcpy(firstDbSlot.key, e.key, sizeof(firstDbSlot.key));
559 firstDbSlot.firstSlot = pageId.number;
560 firstDbSlot.nextSlot = 0;
561 ++firstDbSlot.version;
562 firstDbSlot.payloadSize = 0;
563 sio->dbSlot = &firstDbSlot;
564
565 debugs(47,5, HERE << "dir " << index << " created new filen " <<
566 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
567 sio->swap_filen << std::dec << " at " <<
568 diskOffset(sio->swap_filen));
569
570 sio->file(theFile);
571
572 trackReferences(e);
573 return sio;
574 }
575
576 int64_t
577 Rock::SwapDir::diskOffset(int filen) const
578 {
579 assert(filen >= 0);
580 return HeaderSize + slotSize*filen;
581 }
582
583 int64_t
584 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
585 {
586 assert(pageId);
587 return diskOffset(pageId.number - 1);
588 }
589
590 int64_t
591 Rock::SwapDir::diskOffsetLimit() const
592 {
593 assert(map);
594 return diskOffset(map->entryLimit());
595 }
596
597 int
598 Rock::SwapDir::entryMaxPayloadSize() const
599 {
600 return slotSize - sizeof(DbCellHeader);
601 }
602
603 int
604 Rock::SwapDir::entriesNeeded(const int64_t objSize) const
605 {
606 return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
607 }
608
609 bool
610 Rock::SwapDir::popDbSlot(Ipc::Mem::PageId &pageId)
611 {
612 return dbSlotIndex->pop(pageId);
613 }
614
615 Rock::DbCellHeader &
616 Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId)
617 {
618 const DbCellHeader &s = const_cast<const SwapDir *>(this)->dbSlot(pageId);
619 return const_cast<DbCellHeader &>(s);
620 }
621
622 const Rock::DbCellHeader &
623 Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) const
624 {
625 assert(dbSlotIndex->pageIdIsValid(pageId));
626 return dbSlots[pageId.number - 1];
627 }
628
629 void
630 Rock::SwapDir::cleanReadable(const sfileno fileno)
631 {
632 Ipc::Mem::PageId pageId = map->extras(fileno).pageId;
633 Ipc::Mem::PageId nextPageId = pageId;
634 while (pageId) {
635 const DbCellHeader &curDbSlot = dbSlot(pageId);
636 nextPageId.number = curDbSlot.nextSlot;
637 const DbCellHeader &nextDbSlot = dbSlot(nextPageId);
638 const bool sameChain = memcmp(curDbSlot.key, nextDbSlot.key,
639 sizeof(curDbSlot.key)) == 0 &&
640 curDbSlot.version == nextDbSlot.version;
641 dbSlotIndex->push(pageId);
642 if (sameChain)
643 pageId = nextPageId;
644 }
645 }
646
647 // tries to open an old or being-written-to entry with swap_filen for reading
648 StoreIOState::Pointer
649 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
650 {
651 if (!theFile || theFile->error()) {
652 debugs(47,4, HERE << theFile);
653 return NULL;
654 }
655
656 if (e.swap_filen < 0) {
657 debugs(47,4, HERE << e);
658 return NULL;
659 }
660
661 // Do not start I/O transaction if there are less than 10% free pages left.
662 // TODO: reserve page instead
663 if (needsDiskStrand() &&
664 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
665 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
666 return NULL;
667 }
668
669 // The are two ways an entry can get swap_filen: our get() locked it for
670 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
671 // locked entry is safe, but no support for reading a filling entry.
672 const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
673 if (!slot)
674 return NULL; // we were writing afterall
675
676 IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
677
678 sio->swap_dirn = index;
679 sio->swap_filen = e.swap_filen;
680 sio->dbSlot = &dbSlot(map->extras(e.swap_filen).pageId);
681
682 const Ipc::Mem::PageId &pageId = map->extras(e.swap_filen).pageId;
683 sio->diskOffset = diskOffset(pageId);
684 DbCellHeader &firstDbSlot = dbSlot(map->extras(e.swap_filen).pageId);
685 assert(memcmp(firstDbSlot.key, e.key, sizeof(firstDbSlot.key)));
686 assert(firstDbSlot.firstSlot == pageId.number);
687
688 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
689 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
690 sio->swap_filen);
691
692 assert(slot->basics.swap_file_sz > 0);
693 assert(slot->basics.swap_file_sz == e.swap_file_sz);
694
695 sio->file(theFile);
696 return sio;
697 }
698
699 void
700 Rock::SwapDir::ioCompletedNotification()
701 {
702 if (!theFile)
703 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
704
705 if (theFile->error())
706 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
707 xstrerror());
708
709 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
710 std::setw(12) << maxSize() << " disk bytes and " <<
711 std::setw(7) << map->entryLimit() << " entries");
712
713 rebuild();
714 }
715
716 void
717 Rock::SwapDir::closeCompleted()
718 {
719 theFile = NULL;
720 }
721
722 void
723 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
724 {
725 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
726 assert(request);
727 IoState::Pointer sio = request->sio;
728
729 if (errflag == DISK_OK && rlen > 0)
730 sio->offset_ += rlen;
731
732 StoreIOState::STRCB *callback = sio->read.callback;
733 assert(callback);
734 sio->read.callback = NULL;
735 void *cbdata;
736 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
737 callback(cbdata, r->buf, rlen, sio.getRaw());
738 }
739
740 void
741 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
742 {
743 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
744 assert(request);
745 assert(request->sio != NULL);
746 IoState &sio = *request->sio;
747
748 if (errflag == DISK_OK) {
749 // close, assuming we only write once; the entry gets the read lock
750 map->closeForWriting(sio.swap_filen, true);
751 // do not increment sio.offset_ because we do it in sio->write()
752 if (request->isLast)
753 sio.finishedWriting(errflag);
754 } else
755 writeError(sio.swap_filen);
756 }
757
758 void
759 Rock::SwapDir::writeError(const sfileno fileno)
760 {
761 // Do not abortWriting here. The entry should keep the write lock
762 // instead of losing association with the store and confusing core.
763 map->free(fileno); // will mark as unusable, just in case
764 // XXX: should we call IoState callback?
765 }
766
767 bool
768 Rock::SwapDir::full() const
769 {
770 return map && map->full();
771 }
772
773 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
774 // but it should not happen for us
775 void
776 Rock::SwapDir::diskFull()
777 {
778 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
779 filePath);
780 }
781
782 /// purge while full(); it should be sufficient to purge just one
783 void
784 Rock::SwapDir::maintain()
785 {
786 debugs(47,3, HERE << "cache_dir[" << index << "] guards: " <<
787 !repl << !map << !full() << StoreController::store_dirs_rebuilding);
788
789 if (!repl)
790 return; // no means (cannot find a victim)
791
792 if (!map)
793 return; // no victims (yet)
794
795 if (!full())
796 return; // no need (to find a victim)
797
798 // XXX: UFSSwapDir::maintain says we must quit during rebuild
799 if (StoreController::store_dirs_rebuilding)
800 return;
801
802 debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() <<
803 ' ' << currentSize() << " < " << diskOffsetLimit());
804
805 // Hopefully, we find a removable entry much sooner (TODO: use time?)
806 const int maxProbed = 10000;
807 RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed);
808
809 // It really should not take that long, but this will stop "infinite" loops
810 const int maxFreed = 1000;
811 int freed = 0;
812 // TODO: should we purge more than needed to minimize overheads?
813 for (; freed < maxFreed && full(); ++freed) {
814 if (StoreEntry *e = walker->Next(walker))
815 e->release(); // will call our unlink() method
816 else
817 break; // no more objects
818 }
819
820 debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed <<
821 " scanned " << walker->scanned << '/' << walker->locked);
822
823 walker->Done(walker);
824
825 if (full()) {
826 debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir[" << index << "] " <<
827 "is still full after freeing " << freed << " entries. A bug?");
828 }
829 }
830
831 void
832 Rock::SwapDir::reference(StoreEntry &e)
833 {
834 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
835 if (repl && repl->Referenced)
836 repl->Referenced(repl, &e, &e.repl);
837 }
838
839 bool
840 Rock::SwapDir::dereference(StoreEntry &e, bool)
841 {
842 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
843 if (repl && repl->Dereferenced)
844 repl->Dereferenced(repl, &e, &e.repl);
845
846 // no need to keep e in the global store_table for us; we have our own map
847 return false;
848 }
849
850 bool
851 Rock::SwapDir::unlinkdUseful() const
852 {
853 // no entry-specific files to unlink
854 return false;
855 }
856
857 void
858 Rock::SwapDir::unlink(StoreEntry &e)
859 {
860 debugs(47, 5, HERE << e);
861 ignoreReferences(e);
862 map->free(e.swap_filen);
863 disconnect(e);
864 }
865
866 void
867 Rock::SwapDir::trackReferences(StoreEntry &e)
868 {
869 debugs(47, 5, HERE << e);
870 if (repl)
871 repl->Add(repl, &e, &e.repl);
872 }
873
874 void
875 Rock::SwapDir::ignoreReferences(StoreEntry &e)
876 {
877 debugs(47, 5, HERE << e);
878 if (repl)
879 repl->Remove(repl, &e, &e.repl);
880 }
881
882 void
883 Rock::SwapDir::statfs(StoreEntry &e) const
884 {
885 storeAppendPrintf(&e, "\n");
886 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
887 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
888 currentSize() / 1024.0,
889 Math::doublePercent(currentSize(), maxSize()));
890
891 if (map) {
892 const int limit = map->entryLimit();
893 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
894 if (limit > 0) {
895 const int entryCount = map->entryCount();
896 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
897 entryCount, (100.0 * entryCount / limit));
898
899 if (limit < 100) { // XXX: otherwise too expensive to count
900 Ipc::ReadWriteLockStats stats;
901 map->updateStats(stats);
902 stats.dump(e);
903 }
904 }
905 }
906
907 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
908 store_open_disk_fd, Config.max_open_disk_fds);
909
910 storeAppendPrintf(&e, "Flags:");
911
912 if (flags.selected)
913 storeAppendPrintf(&e, " SELECTED");
914
915 if (flags.read_only)
916 storeAppendPrintf(&e, " READ-ONLY");
917
918 storeAppendPrintf(&e, "\n");
919
920 }
921
922 namespace Rock
923 {
924 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
925 }
926
927 void Rock::SwapDirRr::create(const RunnerRegistry &)
928 {
929 Must(mapOwners.empty() && dbSlotsOwners.empty());
930 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
931 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
932 const int64_t capacity = sd->entryLimitAllowed();
933
934 String inodesPath = sd->path;
935 inodesPath.append("_inodes");
936 SwapDir::DirMap::Owner *const mapOwner =
937 SwapDir::DirMap::Init(inodesPath.termedBuf(), capacity);
938 mapOwners.push_back(mapOwner);
939
940 String spacesPath = sd->path;
941 spacesPath.append("_spaces");
942 // XXX: remove pool id and counters from PageStack
943 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const dbSlotsOwner =
944 shm_new(Ipc::Mem::PageStack)(spacesPath.termedBuf(),
945 i, capacity,
946 sizeof(DbCellHeader));
947 dbSlotsOwners.push_back(dbSlotsOwner);
948
949 // XXX: add method to initialize PageStack with no free pages
950 while (true) {
951 Ipc::Mem::PageId pageId;
952 if (!dbSlotsOwner->object()->pop(pageId))
953 break;
954 }
955 }
956 }
957 }
958
959 Rock::SwapDirRr::~SwapDirRr()
960 {
961 for (size_t i = 0; i < mapOwners.size(); ++i) {
962 delete mapOwners[i];
963 delete dbSlotsOwners[i];
964 }
965 }