]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Merged from trunk
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "ConfigOption.h"
8 #include "DiskIO/DiskIOModule.h"
9 #include "DiskIO/DiskIOStrategy.h"
10 #include "DiskIO/ReadRequest.h"
11 #include "DiskIO/WriteRequest.h"
12 #include "fs/rock/RockSwapDir.h"
13 #include "fs/rock/RockIoState.h"
14 #include "fs/rock/RockIoRequests.h"
15 #include "fs/rock/RockRebuild.h"
16 #include "globals.h"
17 #include "ipc/mem/Pages.h"
18 #include "MemObject.h"
19 #include "Parsing.h"
20 #include "SquidConfig.h"
21 #include "SquidMath.h"
22 #include "tools.h"
23
24 #include <cstdlib>
25 #include <iomanip>
26
27 #if HAVE_SYS_STAT_H
28 #include <sys/stat.h>
29 #endif
30
31 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
32
33 Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL), map(NULL)
34 {
35 }
36
37 Rock::SwapDir::~SwapDir()
38 {
39 delete io;
40 delete map;
41 safe_free(filePath);
42 }
43
44 StoreSearch *
45 Rock::SwapDir::search(String const url, HttpRequest *)
46 {
47 assert(false);
48 return NULL; // XXX: implement
49 }
50
51 void
52 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
53 {
54 ::SwapDir::get(key, cb, data);
55 }
56
57 // called when Squid core needs a StoreEntry with a given key
58 StoreEntry *
59 Rock::SwapDir::get(const cache_key *key)
60 {
61 if (!map || !theFile || !theFile->canRead())
62 return NULL;
63
64 sfileno filen;
65 const Ipc::StoreMapSlot *const slot = map->openForReading(key, filen);
66 if (!slot)
67 return NULL;
68
69 const Ipc::StoreMapSlot::Basics &basics = slot->basics;
70
71 // create a brand new store entry and initialize it with stored basics
72 StoreEntry *e = new StoreEntry();
73 e->lock_count = 0;
74 e->swap_dirn = index;
75 e->swap_filen = filen;
76 e->swap_file_sz = basics.swap_file_sz;
77 e->lastref = basics.lastref;
78 e->timestamp = basics.timestamp;
79 e->expires = basics.expires;
80 e->lastmod = basics.lastmod;
81 e->refcount = basics.refcount;
82 e->flags = basics.flags;
83 e->store_status = STORE_OK;
84 e->setMemStatus(NOT_IN_MEMORY);
85 e->swap_status = SWAPOUT_DONE;
86 e->ping_status = PING_NONE;
87 EBIT_SET(e->flags, ENTRY_CACHABLE);
88 EBIT_CLR(e->flags, RELEASE_REQUEST);
89 EBIT_CLR(e->flags, KEY_PRIVATE);
90 EBIT_SET(e->flags, ENTRY_VALIDATED);
91 e->hashInsert(key);
92 trackReferences(*e);
93
94 return e;
95 // the disk entry remains open for reading, protected from modifications
96 }
97
98 void Rock::SwapDir::disconnect(StoreEntry &e)
99 {
100 assert(e.swap_dirn == index);
101 assert(e.swap_filen >= 0);
102 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
103 assert(e.swap_status != SWAPOUT_NONE);
104
105 // do not rely on e.swap_status here because there is an async delay
106 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
107
108 // since e has swap_filen, its slot is locked for either reading or writing
109 map->abortIo(e.swap_filen);
110 e.swap_dirn = -1;
111 e.swap_filen = -1;
112 e.swap_status = SWAPOUT_NONE;
113 }
114
115 uint64_t
116 Rock::SwapDir::currentSize() const
117 {
118 return HeaderSize + max_objsize * currentCount();
119 }
120
121 uint64_t
122 Rock::SwapDir::currentCount() const
123 {
124 return map ? map->entryCount() : 0;
125 }
126
127 /// In SMP mode only the disker process reports stats to avoid
128 /// counting the same stats by multiple processes.
129 bool
130 Rock::SwapDir::doReportStat() const
131 {
132 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
133 }
134
135 void
136 Rock::SwapDir::swappedOut(const StoreEntry &)
137 {
138 // stats are not stored but computed when needed
139 }
140
141 int64_t
142 Rock::SwapDir::entryLimitAllowed() const
143 {
144 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
145 const int64_t eWanted = (maxSize() - HeaderSize)/maxObjectSize();
146 return min(max(eLimitLo, eWanted), entryLimitHigh());
147 }
148
149 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
150 void
151 Rock::SwapDir::create()
152 {
153 assert(path);
154 assert(filePath);
155
156 if (UsingSmp() && !IamDiskProcess()) {
157 debugs (47,3, HERE << "disker will create in " << path);
158 return;
159 }
160
161 debugs (47,3, HERE << "creating in " << path);
162
163 struct stat swap_sb;
164 if (::stat(path, &swap_sb) < 0) {
165 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
166 const int res = mkdir(path, 0700);
167 if (res != 0) {
168 debugs(47, DBG_CRITICAL, "Failed to create Rock db dir " << path <<
169 ": " << xstrerror());
170 fatal("Rock Store db creation error");
171 }
172 }
173
174 #if SLOWLY_FILL_WITH_ZEROS
175 char block[1024];
176 Must(maxSize() % sizeof(block) == 0);
177 memset(block, '\0', sizeof(block));
178
179 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
180 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
181 if (write(swap, block, sizeof(block)) != sizeof(block)) {
182 debugs(47, DBG_CRITICAL, "ERROR: Failed to create Rock Store db in " << filePath <<
183 ": " << xstrerror());
184 fatal("Rock Store db creation error");
185 }
186 }
187 close(swap);
188 #else
189 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
190 if (swap < 0) {
191 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
192 "; create error: " << xstrerror());
193 fatal("Rock Store db creation error");
194 }
195
196 if (ftruncate(swap, maxSize()) != 0) {
197 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
198 "; truncate error: " << xstrerror());
199 fatal("Rock Store db creation error");
200 }
201
202 char header[HeaderSize];
203 memset(header, '\0', sizeof(header));
204 if (write(swap, header, sizeof(header)) != sizeof(header)) {
205 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
206 "; write error: " << xstrerror());
207 fatal("Rock Store db initialization error");
208 }
209 close(swap);
210 #endif
211 }
212
213 void
214 Rock::SwapDir::init()
215 {
216 debugs(47,2, HERE);
217
218 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
219 // are refcounted. We up our count once to avoid implicit delete's.
220 RefCountReference();
221
222 Must(!map);
223 map = new DirMap(path);
224
225 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
226 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
227 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
228 io = m->createStrategy();
229 io->init();
230 } else {
231 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
232 ioModule);
233 fatal("Rock Store missing a required DiskIO module");
234 }
235
236 theFile = io->newFile(filePath);
237 theFile->configure(fileConfig);
238 theFile->open(O_RDWR, 0644, this);
239
240 // Increment early. Otherwise, if one SwapDir finishes rebuild before
241 // others start, storeRebuildComplete() will think the rebuild is over!
242 // TODO: move store_dirs_rebuilding hack to store modules that need it.
243 ++StoreController::store_dirs_rebuilding;
244 }
245
246 bool
247 Rock::SwapDir::needsDiskStrand() const
248 {
249 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
250 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
251 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
252 wouldWorkBetterWithDisker);
253 }
254
255 void
256 Rock::SwapDir::parse(int anIndex, char *aPath)
257 {
258 index = anIndex;
259
260 path = xstrdup(aPath);
261
262 // cache store is located at path/db
263 String fname(path);
264 fname.append("/rock");
265 filePath = xstrdup(fname.termedBuf());
266
267 parseSize(false);
268 parseOptions(0);
269
270 // Current openForWriting() code overwrites the old slot if needed
271 // and possible, so proactively removing old slots is probably useless.
272 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
273
274 validateOptions();
275 }
276
277 void
278 Rock::SwapDir::reconfigure()
279 {
280 parseSize(true);
281 parseOptions(1);
282 // TODO: can we reconfigure the replacement policy (repl)?
283 validateOptions();
284 }
285
286 /// parse maximum db disk size
287 void
288 Rock::SwapDir::parseSize(const bool reconfiguring)
289 {
290 const int i = GetInteger();
291 if (i < 0)
292 fatal("negative Rock cache_dir size value");
293 const uint64_t new_max_size =
294 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
295 if (!reconfiguring)
296 max_size = new_max_size;
297 else if (new_max_size != max_size) {
298 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
299 "cannot be changed dynamically, value left unchanged (" <<
300 (max_size >> 20) << " MB)");
301 }
302 }
303
304 ConfigOption *
305 Rock::SwapDir::getOptionTree() const
306 {
307 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
308 assert(vector);
309 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
310 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
311 return vector;
312 }
313
314 bool
315 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
316 {
317 return strcmp(option, "max-size") != 0 &&
318 ::SwapDir::allowOptionReconfigure(option);
319 }
320
321 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
322 bool
323 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfiguring)
324 {
325 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
326 // including time unit handling. Same for size.
327
328 time_msec_t *storedTime;
329 if (strcmp(option, "swap-timeout") == 0)
330 storedTime = &fileConfig.ioTimeout;
331 else
332 return false;
333
334 if (!value)
335 self_destruct();
336
337 // TODO: handle time units and detect parsing errors better
338 const int64_t parsedValue = strtoll(value, NULL, 10);
339 if (parsedValue < 0) {
340 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
341 self_destruct();
342 }
343
344 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
345
346 if (!reconfiguring)
347 *storedTime = newTime;
348 else if (*storedTime != newTime) {
349 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
350 << " cannot be changed dynamically, value left unchanged: " <<
351 *storedTime);
352 }
353
354 return true;
355 }
356
357 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
358 void
359 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
360 {
361 if (fileConfig.ioTimeout)
362 storeAppendPrintf(e, " swap-timeout=%" PRId64,
363 static_cast<int64_t>(fileConfig.ioTimeout));
364 }
365
366 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
367 bool
368 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
369 {
370 int *storedRate;
371 if (strcmp(option, "max-swap-rate") == 0)
372 storedRate = &fileConfig.ioRate;
373 else
374 return false;
375
376 if (!value)
377 self_destruct();
378
379 // TODO: handle time units and detect parsing errors better
380 const int64_t parsedValue = strtoll(value, NULL, 10);
381 if (parsedValue < 0) {
382 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
383 self_destruct();
384 }
385
386 const int newRate = static_cast<int>(parsedValue);
387
388 if (newRate < 0) {
389 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
390 self_destruct();
391 }
392
393 if (!isaReconfig)
394 *storedRate = newRate;
395 else if (*storedRate != newRate) {
396 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
397 << " cannot be changed dynamically, value left unchanged: " <<
398 *storedRate);
399 }
400
401 return true;
402 }
403
404 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
405 void
406 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
407 {
408 if (fileConfig.ioRate >= 0)
409 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
410 }
411
412 /// check the results of the configuration; only level-0 debugging works here
413 void
414 Rock::SwapDir::validateOptions()
415 {
416 if (max_objsize <= 0)
417 fatal("Rock store requires a positive max-size");
418
419 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
420 const int64_t maxObjectSizeRoundingWaste = maxObjectSize();
421 const int64_t maxRoundingWaste =
422 max(maxSizeRoundingWaste, maxObjectSizeRoundingWaste);
423 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
424 const int64_t diskWasteSize = maxSize() - usableDiskSize;
425 Must(diskWasteSize >= 0);
426
427 // warn if maximum db size is not reachable due to sfileno limit
428 if (entryLimitAllowed() == entryLimitHigh() &&
429 diskWasteSize >= maxRoundingWaste) {
430 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
431 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
432 debugs(47, DBG_CRITICAL, "\tmaximum object size: " << maxObjectSize() << " Bytes");
433 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
434 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
435 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
436 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
437 }
438 }
439
440 void
441 Rock::SwapDir::rebuild()
442 {
443 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
444 AsyncJob::Start(new Rebuild(this));
445 }
446
447 /* Add a new object to the cache with empty memory copy and pointer to disk
448 * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
449 bool
450 Rock::SwapDir::addEntry(const int filen, const DbCellHeader &header, const StoreEntry &from)
451 {
452 debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
453 ", filen="<< std::setfill('0') << std::hex << std::uppercase <<
454 std::setw(8) << filen);
455
456 sfileno newLocation = 0;
457 if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
458 if (filen == newLocation) {
459 slot->set(from);
460 map->extras(filen) = header;
461 } // else some other, newer entry got into our cell
462 map->closeForWriting(newLocation, false);
463 return filen == newLocation;
464 }
465
466 return false;
467 }
468
469 bool
470 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
471 {
472 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
473 return false;
474
475 if (!theFile || !theFile->canWrite())
476 return false;
477
478 if (!map)
479 return false;
480
481 // Do not start I/O transaction if there are less than 10% free pages left.
482 // TODO: reserve page instead
483 if (needsDiskStrand() &&
484 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
485 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
486 return false;
487 }
488
489 if (io->shedLoad())
490 return false;
491
492 load = io->load();
493 return true;
494 }
495
496 StoreIOState::Pointer
497 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
498 {
499 if (!theFile || theFile->error()) {
500 debugs(47,4, HERE << theFile);
501 return NULL;
502 }
503
504 // compute payload size for our cell header, using StoreEntry info
505 // careful: e.objectLen() may still be negative here
506 const int64_t expectedReplySize = e.mem_obj->expectedReplySize();
507 assert(expectedReplySize >= 0); // must know to prevent cell overflows
508 assert(e.mem_obj->swap_hdr_sz > 0);
509 DbCellHeader header;
510 header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize;
511 const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize;
512 assert(payloadEnd <= max_objsize);
513
514 sfileno filen;
515 Ipc::StoreMapSlot *const slot =
516 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
517 if (!slot) {
518 debugs(47, 5, HERE << "map->add failed");
519 return NULL;
520 }
521 e.swap_file_sz = header.payloadSize; // and will be copied to the map
522 slot->set(e);
523 map->extras(filen) = header;
524
525 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
526 // If that does not happen, the entry will not decrement the read level!
527
528 IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
529
530 sio->swap_dirn = index;
531 sio->swap_filen = filen;
532 sio->payloadEnd = payloadEnd;
533 sio->diskOffset = diskOffset(sio->swap_filen);
534
535 debugs(47,5, HERE << "dir " << index << " created new filen " <<
536 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
537 sio->swap_filen << std::dec << " at " << sio->diskOffset);
538
539 assert(sio->diskOffset + payloadEnd <= diskOffsetLimit());
540
541 sio->file(theFile);
542
543 trackReferences(e);
544 return sio;
545 }
546
547 int64_t
548 Rock::SwapDir::diskOffset(int filen) const
549 {
550 assert(filen >= 0);
551 return HeaderSize + max_objsize*filen;
552 }
553
554 int64_t
555 Rock::SwapDir::diskOffsetLimit() const
556 {
557 assert(map);
558 return diskOffset(map->entryLimit());
559 }
560
561 // tries to open an old or being-written-to entry with swap_filen for reading
562 StoreIOState::Pointer
563 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
564 {
565 if (!theFile || theFile->error()) {
566 debugs(47,4, HERE << theFile);
567 return NULL;
568 }
569
570 if (e.swap_filen < 0) {
571 debugs(47,4, HERE << e);
572 return NULL;
573 }
574
575 // Do not start I/O transaction if there are less than 10% free pages left.
576 // TODO: reserve page instead
577 if (needsDiskStrand() &&
578 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
579 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
580 return NULL;
581 }
582
583 // The are two ways an entry can get swap_filen: our get() locked it for
584 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
585 // locked entry is safe, but no support for reading a filling entry.
586 const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
587 if (!slot)
588 return NULL; // we were writing afterall
589
590 IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
591
592 sio->swap_dirn = index;
593 sio->swap_filen = e.swap_filen;
594 sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize;
595 assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
596
597 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
598 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
599 sio->swap_filen);
600
601 assert(slot->basics.swap_file_sz > 0);
602 assert(slot->basics.swap_file_sz == e.swap_file_sz);
603
604 sio->diskOffset = diskOffset(sio->swap_filen);
605 assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
606
607 sio->file(theFile);
608 return sio;
609 }
610
611 void
612 Rock::SwapDir::ioCompletedNotification()
613 {
614 if (!theFile)
615 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
616
617 if (theFile->error())
618 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
619 xstrerror());
620
621 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
622 std::setw(12) << maxSize() << " disk bytes and " <<
623 std::setw(7) << map->entryLimit() << " entries");
624
625 rebuild();
626 }
627
628 void
629 Rock::SwapDir::closeCompleted()
630 {
631 theFile = NULL;
632 }
633
634 void
635 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
636 {
637 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
638 assert(request);
639 IoState::Pointer sio = request->sio;
640
641 if (errflag == DISK_OK && rlen > 0)
642 sio->offset_ += rlen;
643 assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum
644
645 StoreIOState::STRCB *callback = sio->read.callback;
646 assert(callback);
647 sio->read.callback = NULL;
648 void *cbdata;
649 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
650 callback(cbdata, r->buf, rlen, sio.getRaw());
651 }
652
653 void
654 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
655 {
656 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
657 assert(request);
658 assert(request->sio != NULL);
659 IoState &sio = *request->sio;
660
661 if (errflag == DISK_OK) {
662 // close, assuming we only write once; the entry gets the read lock
663 map->closeForWriting(sio.swap_filen, true);
664 // do not increment sio.offset_ because we do it in sio->write()
665 } else {
666 // Do not abortWriting here. The entry should keep the write lock
667 // instead of losing association with the store and confusing core.
668 map->free(sio.swap_filen); // will mark as unusable, just in case
669 }
670
671 assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum
672
673 sio.finishedWriting(errflag);
674 }
675
676 bool
677 Rock::SwapDir::full() const
678 {
679 return map && map->full();
680 }
681
682 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
683 // but it should not happen for us
684 void
685 Rock::SwapDir::diskFull()
686 {
687 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
688 filePath);
689 }
690
691 /// purge while full(); it should be sufficient to purge just one
692 void
693 Rock::SwapDir::maintain()
694 {
695 debugs(47,3, HERE << "cache_dir[" << index << "] guards: " <<
696 !repl << !map << !full() << StoreController::store_dirs_rebuilding);
697
698 if (!repl)
699 return; // no means (cannot find a victim)
700
701 if (!map)
702 return; // no victims (yet)
703
704 if (!full())
705 return; // no need (to find a victim)
706
707 // XXX: UFSSwapDir::maintain says we must quit during rebuild
708 if (StoreController::store_dirs_rebuilding)
709 return;
710
711 debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() <<
712 ' ' << currentSize() << " < " << diskOffsetLimit());
713
714 // Hopefully, we find a removable entry much sooner (TODO: use time?)
715 const int maxProbed = 10000;
716 RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed);
717
718 // It really should not take that long, but this will stop "infinite" loops
719 const int maxFreed = 1000;
720 int freed = 0;
721 // TODO: should we purge more than needed to minimize overheads?
722 for (; freed < maxFreed && full(); ++freed) {
723 if (StoreEntry *e = walker->Next(walker))
724 e->release(); // will call our unlink() method
725 else
726 break; // no more objects
727 }
728
729 debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed <<
730 " scanned " << walker->scanned << '/' << walker->locked);
731
732 walker->Done(walker);
733
734 if (full()) {
735 debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir[" << index << "] " <<
736 "is still full after freeing " << freed << " entries. A bug?");
737 }
738 }
739
740 void
741 Rock::SwapDir::reference(StoreEntry &e)
742 {
743 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
744 if (repl && repl->Referenced)
745 repl->Referenced(repl, &e, &e.repl);
746 }
747
748 bool
749 Rock::SwapDir::dereference(StoreEntry &e)
750 {
751 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
752 if (repl && repl->Dereferenced)
753 repl->Dereferenced(repl, &e, &e.repl);
754
755 // no need to keep e in the global store_table for us; we have our own map
756 return false;
757 }
758
759 bool
760 Rock::SwapDir::unlinkdUseful() const
761 {
762 // no entry-specific files to unlink
763 return false;
764 }
765
766 void
767 Rock::SwapDir::unlink(StoreEntry &e)
768 {
769 debugs(47, 5, HERE << e);
770 ignoreReferences(e);
771 map->free(e.swap_filen);
772 disconnect(e);
773 }
774
775 void
776 Rock::SwapDir::trackReferences(StoreEntry &e)
777 {
778 debugs(47, 5, HERE << e);
779 if (repl)
780 repl->Add(repl, &e, &e.repl);
781 }
782
783 void
784 Rock::SwapDir::ignoreReferences(StoreEntry &e)
785 {
786 debugs(47, 5, HERE << e);
787 if (repl)
788 repl->Remove(repl, &e, &e.repl);
789 }
790
791 void
792 Rock::SwapDir::statfs(StoreEntry &e) const
793 {
794 storeAppendPrintf(&e, "\n");
795 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
796 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
797 currentSize() / 1024.0,
798 Math::doublePercent(currentSize(), maxSize()));
799
800 if (map) {
801 const int limit = map->entryLimit();
802 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
803 if (limit > 0) {
804 const int entryCount = map->entryCount();
805 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
806 entryCount, (100.0 * entryCount / limit));
807
808 if (limit < 100) { // XXX: otherwise too expensive to count
809 Ipc::ReadWriteLockStats stats;
810 map->updateStats(stats);
811 stats.dump(e);
812 }
813 }
814 }
815
816 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
817 store_open_disk_fd, Config.max_open_disk_fds);
818
819 storeAppendPrintf(&e, "Flags:");
820
821 if (flags.selected)
822 storeAppendPrintf(&e, " SELECTED");
823
824 if (flags.read_only)
825 storeAppendPrintf(&e, " READ-ONLY");
826
827 storeAppendPrintf(&e, "\n");
828
829 }
830
831 namespace Rock
832 {
833 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
834 }
835
836 void Rock::SwapDirRr::create(const RunnerRegistry &)
837 {
838 Must(owners.empty());
839 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
840 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
841 Rock::SwapDir::DirMap::Owner *const owner =
842 Rock::SwapDir::DirMap::Init(sd->path, sd->entryLimitAllowed());
843 owners.push_back(owner);
844 }
845 }
846 }
847
848 Rock::SwapDirRr::~SwapDirRr()
849 {
850 for (size_t i = 0; i < owners.size(); ++i)
851 delete owners[i];
852 }