]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "ConfigOption.h"
8 #include "DiskIO/DiskIOModule.h"
9 #include "DiskIO/DiskIOStrategy.h"
10 #include "DiskIO/ReadRequest.h"
11 #include "DiskIO/WriteRequest.h"
12 #include "fs/rock/RockIoRequests.h"
13 #include "fs/rock/RockIoState.h"
14 #include "fs/rock/RockRebuild.h"
15 #include "fs/rock/RockSwapDir.h"
16 #include "globals.h"
17 #include "ipc/mem/Pages.h"
18 #include "MemObject.h"
19 #include "Parsing.h"
20 #include "SquidConfig.h"
21 #include "SquidMath.h"
22 #include "tools.h"
23
24 #include <cstdlib>
25 #include <iomanip>
26
27 #if HAVE_SYS_STAT_H
28 #include <sys/stat.h>
29 #endif
30
31 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
32
33 Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL), map(NULL)
34 {
35 }
36
37 Rock::SwapDir::~SwapDir()
38 {
39 delete io;
40 delete map;
41 safe_free(filePath);
42 }
43
44 StoreSearch *
45 Rock::SwapDir::search(String const url, HttpRequest *)
46 {
47 assert(false);
48 return NULL; // XXX: implement
49 }
50
51 void
52 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
53 {
54 ::SwapDir::get(key, cb, data);
55 }
56
57 // called when Squid core needs a StoreEntry with a given key
58 StoreEntry *
59 Rock::SwapDir::get(const cache_key *key)
60 {
61 if (!map || !theFile || !theFile->canRead())
62 return NULL;
63
64 sfileno filen;
65 const Ipc::StoreMapSlot *const slot = map->openForReading(key, filen);
66 if (!slot)
67 return NULL;
68
69 const Ipc::StoreMapSlot::Basics &basics = slot->basics;
70
71 // create a brand new store entry and initialize it with stored basics
72 StoreEntry *e = new StoreEntry();
73 e->lock_count = 0;
74 e->swap_dirn = index;
75 e->swap_filen = filen;
76 e->swap_file_sz = basics.swap_file_sz;
77 e->lastref = basics.lastref;
78 e->timestamp = basics.timestamp;
79 e->expires = basics.expires;
80 e->lastmod = basics.lastmod;
81 e->refcount = basics.refcount;
82 e->flags = basics.flags;
83 e->store_status = STORE_OK;
84 e->setMemStatus(NOT_IN_MEMORY);
85 e->swap_status = SWAPOUT_DONE;
86 e->ping_status = PING_NONE;
87 EBIT_SET(e->flags, ENTRY_CACHABLE);
88 EBIT_CLR(e->flags, RELEASE_REQUEST);
89 EBIT_CLR(e->flags, KEY_PRIVATE);
90 EBIT_SET(e->flags, ENTRY_VALIDATED);
91 e->hashInsert(key);
92 trackReferences(*e);
93
94 return e;
95 // the disk entry remains open for reading, protected from modifications
96 }
97
98 void Rock::SwapDir::disconnect(StoreEntry &e)
99 {
100 assert(e.swap_dirn == index);
101 assert(e.swap_filen >= 0);
102 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
103 assert(e.swap_status != SWAPOUT_NONE);
104
105 // do not rely on e.swap_status here because there is an async delay
106 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
107
108 // since e has swap_filen, its slot is locked for either reading or writing
109 map->abortIo(e.swap_filen);
110 e.swap_dirn = -1;
111 e.swap_filen = -1;
112 e.swap_status = SWAPOUT_NONE;
113 }
114
115 uint64_t
116 Rock::SwapDir::currentSize() const
117 {
118 return HeaderSize + max_objsize * currentCount();
119 }
120
121 uint64_t
122 Rock::SwapDir::currentCount() const
123 {
124 return map ? map->entryCount() : 0;
125 }
126
127 /// In SMP mode only the disker process reports stats to avoid
128 /// counting the same stats by multiple processes.
129 bool
130 Rock::SwapDir::doReportStat() const
131 {
132 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
133 }
134
135 void
136 Rock::SwapDir::swappedOut(const StoreEntry &)
137 {
138 // stats are not stored but computed when needed
139 }
140
141 int64_t
142 Rock::SwapDir::entryLimitAllowed() const
143 {
144 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
145 const int64_t eWanted = (maxSize() - HeaderSize)/maxObjectSize();
146 return min(max(eLimitLo, eWanted), entryLimitHigh());
147 }
148
149 // TODO: encapsulate as a tool
150 void
151 Rock::SwapDir::create()
152 {
153 assert(path);
154 assert(filePath);
155
156 if (UsingSmp() && !IamDiskProcess()) {
157 debugs (47,3, HERE << "disker will create in " << path);
158 return;
159 }
160
161 debugs (47,3, HERE << "creating in " << path);
162
163 struct stat dir_sb;
164 if (::stat(path, &dir_sb) == 0) {
165 struct stat file_sb;
166 if (::stat(filePath, &file_sb) == 0) {
167 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
168 return;
169 }
170 // else the db file is not there or is not accessible, and we will try
171 // to create it later below, generating a detailed error on failures.
172 } else { // path does not exist or is inaccessible
173 // If path exists but is not accessible, mkdir() below will fail, and
174 // the admin should see the error and act accordingly, so there is
175 // no need to distinguish ENOENT from other possible stat() errors.
176 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
177 const int res = mkdir(path, 0700);
178 if (res != 0) {
179 debugs(47, DBG_CRITICAL, "Failed to create Rock db dir " << path <<
180 ": " << xstrerror());
181 fatal("Rock Store db creation error");
182 }
183 }
184
185 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
186 #if SLOWLY_FILL_WITH_ZEROS
187 char block[1024];
188 Must(maxSize() % sizeof(block) == 0);
189 memset(block, '\0', sizeof(block));
190
191 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
192 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
193 if (write(swap, block, sizeof(block)) != sizeof(block)) {
194 debugs(47, DBG_CRITICAL, "ERROR: Failed to create Rock Store db in " << filePath <<
195 ": " << xstrerror());
196 fatal("Rock Store db creation error");
197 }
198 }
199 close(swap);
200 #else
201 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
202 if (swap < 0) {
203 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
204 "; create error: " << xstrerror());
205 fatal("Rock Store db creation error");
206 }
207
208 if (ftruncate(swap, maxSize()) != 0) {
209 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
210 "; truncate error: " << xstrerror());
211 fatal("Rock Store db creation error");
212 }
213
214 char header[HeaderSize];
215 memset(header, '\0', sizeof(header));
216 if (write(swap, header, sizeof(header)) != sizeof(header)) {
217 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
218 "; write error: " << xstrerror());
219 fatal("Rock Store db initialization error");
220 }
221 close(swap);
222 #endif
223 }
224
225 void
226 Rock::SwapDir::init()
227 {
228 debugs(47,2, HERE);
229
230 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
231 // are refcounted. We up our count once to avoid implicit delete's.
232 lock();
233
234 Must(!map);
235 map = new DirMap(path);
236
237 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
238 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
239 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
240 io = m->createStrategy();
241 io->init();
242 } else {
243 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
244 ioModule);
245 fatal("Rock Store missing a required DiskIO module");
246 }
247
248 theFile = io->newFile(filePath);
249 theFile->configure(fileConfig);
250 theFile->open(O_RDWR, 0644, this);
251
252 // Increment early. Otherwise, if one SwapDir finishes rebuild before
253 // others start, storeRebuildComplete() will think the rebuild is over!
254 // TODO: move store_dirs_rebuilding hack to store modules that need it.
255 ++StoreController::store_dirs_rebuilding;
256 }
257
258 bool
259 Rock::SwapDir::needsDiskStrand() const
260 {
261 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
262 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
263 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
264 wouldWorkBetterWithDisker);
265 }
266
267 void
268 Rock::SwapDir::parse(int anIndex, char *aPath)
269 {
270 index = anIndex;
271
272 path = xstrdup(aPath);
273
274 // cache store is located at path/db
275 String fname(path);
276 fname.append("/rock");
277 filePath = xstrdup(fname.termedBuf());
278
279 parseSize(false);
280 parseOptions(0);
281
282 // Current openForWriting() code overwrites the old slot if needed
283 // and possible, so proactively removing old slots is probably useless.
284 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
285
286 validateOptions();
287 }
288
289 void
290 Rock::SwapDir::reconfigure()
291 {
292 parseSize(true);
293 parseOptions(1);
294 // TODO: can we reconfigure the replacement policy (repl)?
295 validateOptions();
296 }
297
298 /// parse maximum db disk size
299 void
300 Rock::SwapDir::parseSize(const bool reconfig)
301 {
302 const int i = GetInteger();
303 if (i < 0)
304 fatal("negative Rock cache_dir size value");
305 const uint64_t new_max_size =
306 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
307 if (!reconfig)
308 max_size = new_max_size;
309 else if (new_max_size != max_size) {
310 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
311 "cannot be changed dynamically, value left unchanged (" <<
312 (max_size >> 20) << " MB)");
313 }
314 }
315
316 ConfigOption *
317 Rock::SwapDir::getOptionTree() const
318 {
319 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
320 assert(vector);
321 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
322 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
323 return vector;
324 }
325
326 bool
327 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
328 {
329 return strcmp(option, "max-size") != 0 &&
330 ::SwapDir::allowOptionReconfigure(option);
331 }
332
333 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
334 bool
335 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
336 {
337 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
338 // including time unit handling. Same for size.
339
340 time_msec_t *storedTime;
341 if (strcmp(option, "swap-timeout") == 0)
342 storedTime = &fileConfig.ioTimeout;
343 else
344 return false;
345
346 if (!value)
347 self_destruct();
348
349 // TODO: handle time units and detect parsing errors better
350 const int64_t parsedValue = strtoll(value, NULL, 10);
351 if (parsedValue < 0) {
352 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
353 self_destruct();
354 }
355
356 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
357
358 if (!reconfig)
359 *storedTime = newTime;
360 else if (*storedTime != newTime) {
361 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
362 << " cannot be changed dynamically, value left unchanged: " <<
363 *storedTime);
364 }
365
366 return true;
367 }
368
369 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
370 void
371 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
372 {
373 if (fileConfig.ioTimeout)
374 storeAppendPrintf(e, " swap-timeout=%" PRId64,
375 static_cast<int64_t>(fileConfig.ioTimeout));
376 }
377
378 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
379 bool
380 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
381 {
382 int *storedRate;
383 if (strcmp(option, "max-swap-rate") == 0)
384 storedRate = &fileConfig.ioRate;
385 else
386 return false;
387
388 if (!value)
389 self_destruct();
390
391 // TODO: handle time units and detect parsing errors better
392 const int64_t parsedValue = strtoll(value, NULL, 10);
393 if (parsedValue < 0) {
394 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
395 self_destruct();
396 }
397
398 const int newRate = static_cast<int>(parsedValue);
399
400 if (newRate < 0) {
401 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
402 self_destruct();
403 }
404
405 if (!isaReconfig)
406 *storedRate = newRate;
407 else if (*storedRate != newRate) {
408 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
409 << " cannot be changed dynamically, value left unchanged: " <<
410 *storedRate);
411 }
412
413 return true;
414 }
415
416 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
417 void
418 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
419 {
420 if (fileConfig.ioRate >= 0)
421 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
422 }
423
424 /// check the results of the configuration; only level-0 debugging works here
425 void
426 Rock::SwapDir::validateOptions()
427 {
428 if (max_objsize <= 0)
429 fatal("Rock store requires a positive max-size");
430
431 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
432 const int64_t maxObjectSizeRoundingWaste = maxObjectSize();
433 const int64_t maxRoundingWaste =
434 max(maxSizeRoundingWaste, maxObjectSizeRoundingWaste);
435 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
436 const int64_t diskWasteSize = maxSize() - usableDiskSize;
437 Must(diskWasteSize >= 0);
438
439 // warn if maximum db size is not reachable due to sfileno limit
440 if (entryLimitAllowed() == entryLimitHigh() &&
441 diskWasteSize >= maxRoundingWaste) {
442 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
443 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
444 debugs(47, DBG_CRITICAL, "\tmaximum object size: " << maxObjectSize() << " Bytes");
445 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
446 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
447 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
448 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
449 }
450 }
451
452 void
453 Rock::SwapDir::rebuild()
454 {
455 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
456 AsyncJob::Start(new Rebuild(this));
457 }
458
459 /* Add a new object to the cache with empty memory copy and pointer to disk
460 * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
461 bool
462 Rock::SwapDir::addEntry(const int filen, const DbCellHeader &header, const StoreEntry &from)
463 {
464 debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
465 ", filen="<< std::setfill('0') << std::hex << std::uppercase <<
466 std::setw(8) << filen);
467
468 sfileno newLocation = 0;
469 if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
470 if (filen == newLocation) {
471 slot->set(from);
472 map->extras(filen) = header;
473 } // else some other, newer entry got into our cell
474 map->closeForWriting(newLocation, false);
475 return filen == newLocation;
476 }
477
478 return false;
479 }
480
481 bool
482 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
483 {
484 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
485 return false;
486
487 if (!theFile || !theFile->canWrite())
488 return false;
489
490 if (!map)
491 return false;
492
493 // Do not start I/O transaction if there are less than 10% free pages left.
494 // TODO: reserve page instead
495 if (needsDiskStrand() &&
496 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
497 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
498 return false;
499 }
500
501 if (io->shedLoad())
502 return false;
503
504 load = io->load();
505 return true;
506 }
507
508 StoreIOState::Pointer
509 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
510 {
511 if (!theFile || theFile->error()) {
512 debugs(47,4, HERE << theFile);
513 return NULL;
514 }
515
516 // compute payload size for our cell header, using StoreEntry info
517 // careful: e.objectLen() may still be negative here
518 const int64_t expectedReplySize = e.mem_obj->expectedReplySize();
519 assert(expectedReplySize >= 0); // must know to prevent cell overflows
520 assert(e.mem_obj->swap_hdr_sz > 0);
521 DbCellHeader header;
522 header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize;
523 const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize;
524 assert(payloadEnd <= max_objsize);
525
526 sfileno filen;
527 Ipc::StoreMapSlot *const slot =
528 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
529 if (!slot) {
530 debugs(47, 5, HERE << "map->add failed");
531 return NULL;
532 }
533 e.swap_file_sz = header.payloadSize; // and will be copied to the map
534 slot->set(e);
535 map->extras(filen) = header;
536
537 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
538 // If that does not happen, the entry will not decrement the read level!
539
540 IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
541
542 sio->swap_dirn = index;
543 sio->swap_filen = filen;
544 sio->payloadEnd = payloadEnd;
545 sio->diskOffset = diskOffset(sio->swap_filen);
546
547 debugs(47,5, HERE << "dir " << index << " created new filen " <<
548 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
549 sio->swap_filen << std::dec << " at " << sio->diskOffset);
550
551 assert(sio->diskOffset + payloadEnd <= diskOffsetLimit());
552
553 sio->file(theFile);
554
555 trackReferences(e);
556 return sio;
557 }
558
559 int64_t
560 Rock::SwapDir::diskOffset(int filen) const
561 {
562 assert(filen >= 0);
563 return HeaderSize + max_objsize*filen;
564 }
565
566 int64_t
567 Rock::SwapDir::diskOffsetLimit() const
568 {
569 assert(map);
570 return diskOffset(map->entryLimit());
571 }
572
573 // tries to open an old or being-written-to entry with swap_filen for reading
574 StoreIOState::Pointer
575 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
576 {
577 if (!theFile || theFile->error()) {
578 debugs(47,4, HERE << theFile);
579 return NULL;
580 }
581
582 if (e.swap_filen < 0) {
583 debugs(47,4, HERE << e);
584 return NULL;
585 }
586
587 // Do not start I/O transaction if there are less than 10% free pages left.
588 // TODO: reserve page instead
589 if (needsDiskStrand() &&
590 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
591 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
592 return NULL;
593 }
594
595 // The are two ways an entry can get swap_filen: our get() locked it for
596 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
597 // locked entry is safe, but no support for reading a filling entry.
598 const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
599 if (!slot)
600 return NULL; // we were writing afterall
601
602 IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
603
604 sio->swap_dirn = index;
605 sio->swap_filen = e.swap_filen;
606 sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize;
607 assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
608
609 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
610 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
611 sio->swap_filen);
612
613 assert(slot->basics.swap_file_sz > 0);
614 assert(slot->basics.swap_file_sz == e.swap_file_sz);
615
616 sio->diskOffset = diskOffset(sio->swap_filen);
617 assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
618
619 sio->file(theFile);
620 return sio;
621 }
622
623 void
624 Rock::SwapDir::ioCompletedNotification()
625 {
626 if (!theFile)
627 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
628
629 if (theFile->error())
630 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
631 xstrerror());
632
633 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
634 std::setw(12) << maxSize() << " disk bytes and " <<
635 std::setw(7) << map->entryLimit() << " entries");
636
637 rebuild();
638 }
639
640 void
641 Rock::SwapDir::closeCompleted()
642 {
643 theFile = NULL;
644 }
645
646 void
647 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
648 {
649 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
650 assert(request);
651 IoState::Pointer sio = request->sio;
652
653 if (errflag == DISK_OK && rlen > 0)
654 sio->offset_ += rlen;
655 assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum
656
657 StoreIOState::STRCB *callb = sio->read.callback;
658 assert(callb);
659 sio->read.callback = NULL;
660 void *cbdata;
661 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
662 callb(cbdata, r->buf, rlen, sio.getRaw());
663 }
664
665 void
666 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
667 {
668 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
669 assert(request);
670 assert(request->sio != NULL);
671 IoState &sio = *request->sio;
672
673 if (errflag == DISK_OK) {
674 // close, assuming we only write once; the entry gets the read lock
675 map->closeForWriting(sio.swap_filen, true);
676 // do not increment sio.offset_ because we do it in sio->write()
677 } else {
678 // Do not abortWriting here. The entry should keep the write lock
679 // instead of losing association with the store and confusing core.
680 map->free(sio.swap_filen); // will mark as unusable, just in case
681 }
682
683 assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum
684
685 sio.finishedWriting(errflag);
686 }
687
688 bool
689 Rock::SwapDir::full() const
690 {
691 return map && map->full();
692 }
693
694 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
695 // but it should not happen for us
696 void
697 Rock::SwapDir::diskFull()
698 {
699 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
700 filePath);
701 }
702
703 /// purge while full(); it should be sufficient to purge just one
704 void
705 Rock::SwapDir::maintain()
706 {
707 debugs(47,3, HERE << "cache_dir[" << index << "] guards: " <<
708 !repl << !map << !full() << StoreController::store_dirs_rebuilding);
709
710 if (!repl)
711 return; // no means (cannot find a victim)
712
713 if (!map)
714 return; // no victims (yet)
715
716 if (!full())
717 return; // no need (to find a victim)
718
719 // XXX: UFSSwapDir::maintain says we must quit during rebuild
720 if (StoreController::store_dirs_rebuilding)
721 return;
722
723 debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() <<
724 ' ' << currentSize() << " < " << diskOffsetLimit());
725
726 // Hopefully, we find a removable entry much sooner (TODO: use time?)
727 const int maxProbed = 10000;
728 RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed);
729
730 // It really should not take that long, but this will stop "infinite" loops
731 const int maxFreed = 1000;
732 int freed = 0;
733 // TODO: should we purge more than needed to minimize overheads?
734 for (; freed < maxFreed && full(); ++freed) {
735 if (StoreEntry *e = walker->Next(walker))
736 e->release(); // will call our unlink() method
737 else
738 break; // no more objects
739 }
740
741 debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed <<
742 " scanned " << walker->scanned << '/' << walker->locked);
743
744 walker->Done(walker);
745
746 if (full()) {
747 debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir[" << index << "] " <<
748 "is still full after freeing " << freed << " entries. A bug?");
749 }
750 }
751
752 void
753 Rock::SwapDir::reference(StoreEntry &e)
754 {
755 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
756 if (repl && repl->Referenced)
757 repl->Referenced(repl, &e, &e.repl);
758 }
759
760 bool
761 Rock::SwapDir::dereference(StoreEntry &e, bool)
762 {
763 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
764 if (repl && repl->Dereferenced)
765 repl->Dereferenced(repl, &e, &e.repl);
766
767 // no need to keep e in the global store_table for us; we have our own map
768 return false;
769 }
770
771 bool
772 Rock::SwapDir::unlinkdUseful() const
773 {
774 // no entry-specific files to unlink
775 return false;
776 }
777
778 void
779 Rock::SwapDir::unlink(StoreEntry &e)
780 {
781 debugs(47, 5, HERE << e);
782 ignoreReferences(e);
783 map->free(e.swap_filen);
784 disconnect(e);
785 }
786
787 void
788 Rock::SwapDir::trackReferences(StoreEntry &e)
789 {
790 debugs(47, 5, HERE << e);
791 if (repl)
792 repl->Add(repl, &e, &e.repl);
793 }
794
795 void
796 Rock::SwapDir::ignoreReferences(StoreEntry &e)
797 {
798 debugs(47, 5, HERE << e);
799 if (repl)
800 repl->Remove(repl, &e, &e.repl);
801 }
802
803 void
804 Rock::SwapDir::statfs(StoreEntry &e) const
805 {
806 storeAppendPrintf(&e, "\n");
807 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
808 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
809 currentSize() / 1024.0,
810 Math::doublePercent(currentSize(), maxSize()));
811
812 if (map) {
813 const int limit = map->entryLimit();
814 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
815 if (limit > 0) {
816 const int entryCount = map->entryCount();
817 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
818 entryCount, (100.0 * entryCount / limit));
819
820 if (limit < 100) { // XXX: otherwise too expensive to count
821 Ipc::ReadWriteLockStats stats;
822 map->updateStats(stats);
823 stats.dump(e);
824 }
825 }
826 }
827
828 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
829 store_open_disk_fd, Config.max_open_disk_fds);
830
831 storeAppendPrintf(&e, "Flags:");
832
833 if (flags.selected)
834 storeAppendPrintf(&e, " SELECTED");
835
836 if (flags.read_only)
837 storeAppendPrintf(&e, " READ-ONLY");
838
839 storeAppendPrintf(&e, "\n");
840
841 }
842
843 namespace Rock
844 {
845 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
846 }
847
848 void Rock::SwapDirRr::create(const RunnerRegistry &)
849 {
850 Must(owners.empty());
851 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
852 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
853 Rock::SwapDir::DirMap::Owner *const owner =
854 Rock::SwapDir::DirMap::Init(sd->path, sd->entryLimitAllowed());
855 owners.push_back(owner);
856 }
857 }
858 }
859
860 Rock::SwapDirRr::~SwapDirRr()
861 {
862 for (size_t i = 0; i < owners.size(); ++i)
863 delete owners[i];
864 }