]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
protos.h refactoring, part one.
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 47 Store Directory Routines
5 */
6
7 #include "squid.h"
8 #include "cache_cf.h"
9 #include "ConfigOption.h"
10 #include "DiskIO/DiskIOModule.h"
11 #include "DiskIO/DiskIOStrategy.h"
12 #include "DiskIO/ReadRequest.h"
13 #include "DiskIO/WriteRequest.h"
14 #include "fs/rock/RockSwapDir.h"
15 #include "fs/rock/RockIoState.h"
16 #include "fs/rock/RockIoRequests.h"
17 #include "fs/rock/RockRebuild.h"
18 #include "ipc/mem/Pages.h"
19 #include "MemObject.h"
20 #include "Parsing.h"
21 #include "protos.h"
22 #include "SquidMath.h"
23 #include <cstdlib>
24 #include <iomanip>
25
26 #if HAVE_SYS_STAT_H
27 #include <sys/stat.h>
28 #endif
29
30 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
31
32 Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL), map(NULL)
33 {
34 }
35
36 Rock::SwapDir::~SwapDir()
37 {
38 delete io;
39 delete map;
40 safe_free(filePath);
41 }
42
43 StoreSearch *
44 Rock::SwapDir::search(String const url, HttpRequest *)
45 {
46 assert(false);
47 return NULL; // XXX: implement
48 }
49
50 void
51 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
52 {
53 ::SwapDir::get(key, cb, data);
54 }
55
56 // called when Squid core needs a StoreEntry with a given key
57 StoreEntry *
58 Rock::SwapDir::get(const cache_key *key)
59 {
60 if (!map || !theFile || !theFile->canRead())
61 return NULL;
62
63 sfileno filen;
64 const Ipc::StoreMapSlot *const slot = map->openForReading(key, filen);
65 if (!slot)
66 return NULL;
67
68 const Ipc::StoreMapSlot::Basics &basics = slot->basics;
69
70 // create a brand new store entry and initialize it with stored basics
71 StoreEntry *e = new StoreEntry();
72 e->lock_count = 0;
73 e->swap_dirn = index;
74 e->swap_filen = filen;
75 e->swap_file_sz = basics.swap_file_sz;
76 e->lastref = basics.lastref;
77 e->timestamp = basics.timestamp;
78 e->expires = basics.expires;
79 e->lastmod = basics.lastmod;
80 e->refcount = basics.refcount;
81 e->flags = basics.flags;
82 e->store_status = STORE_OK;
83 e->setMemStatus(NOT_IN_MEMORY);
84 e->swap_status = SWAPOUT_DONE;
85 e->ping_status = PING_NONE;
86 EBIT_SET(e->flags, ENTRY_CACHABLE);
87 EBIT_CLR(e->flags, RELEASE_REQUEST);
88 EBIT_CLR(e->flags, KEY_PRIVATE);
89 EBIT_SET(e->flags, ENTRY_VALIDATED);
90 e->hashInsert(key);
91 trackReferences(*e);
92
93 return e;
94 // the disk entry remains open for reading, protected from modifications
95 }
96
97 void Rock::SwapDir::disconnect(StoreEntry &e)
98 {
99 assert(e.swap_dirn == index);
100 assert(e.swap_filen >= 0);
101 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
102 assert(e.swap_status != SWAPOUT_NONE);
103
104 // do not rely on e.swap_status here because there is an async delay
105 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
106
107 // since e has swap_filen, its slot is locked for either reading or writing
108 map->abortIo(e.swap_filen);
109 e.swap_dirn = -1;
110 e.swap_filen = -1;
111 e.swap_status = SWAPOUT_NONE;
112 }
113
114 uint64_t
115 Rock::SwapDir::currentSize() const
116 {
117 return HeaderSize + max_objsize * currentCount();
118 }
119
120 uint64_t
121 Rock::SwapDir::currentCount() const
122 {
123 return map ? map->entryCount() : 0;
124 }
125
126 /// In SMP mode only the disker process reports stats to avoid
127 /// counting the same stats by multiple processes.
128 bool
129 Rock::SwapDir::doReportStat() const
130 {
131 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
132 }
133
134 void
135 Rock::SwapDir::swappedOut(const StoreEntry &)
136 {
137 // stats are not stored but computed when needed
138 }
139
140 int64_t
141 Rock::SwapDir::entryLimitAllowed() const
142 {
143 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
144 const int64_t eWanted = (maxSize() - HeaderSize)/maxObjectSize();
145 return min(max(eLimitLo, eWanted), entryLimitHigh());
146 }
147
148 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
149 void
150 Rock::SwapDir::create()
151 {
152 assert(path);
153 assert(filePath);
154
155 if (UsingSmp() && !IamDiskProcess()) {
156 debugs (47,3, HERE << "disker will create in " << path);
157 return;
158 }
159
160 debugs (47,3, HERE << "creating in " << path);
161
162 struct stat swap_sb;
163 if (::stat(path, &swap_sb) < 0) {
164 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
165 const int res = mkdir(path, 0700);
166 if (res != 0) {
167 debugs(47, DBG_CRITICAL, "Failed to create Rock db dir " << path <<
168 ": " << xstrerror());
169 fatal("Rock Store db creation error");
170 }
171 }
172
173 #if SLOWLY_FILL_WITH_ZEROS
174 char block[1024];
175 Must(maxSize() % sizeof(block) == 0);
176 memset(block, '\0', sizeof(block));
177
178 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
179 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
180 if (write(swap, block, sizeof(block)) != sizeof(block)) {
181 debugs(47, DBG_CRITICAL, "ERROR: Failed to create Rock Store db in " << filePath <<
182 ": " << xstrerror());
183 fatal("Rock Store db creation error");
184 }
185 }
186 close(swap);
187 #else
188 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
189 if (swap < 0) {
190 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
191 "; create error: " << xstrerror());
192 fatal("Rock Store db creation error");
193 }
194
195 if (ftruncate(swap, maxSize()) != 0) {
196 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
197 "; truncate error: " << xstrerror());
198 fatal("Rock Store db creation error");
199 }
200
201 char header[HeaderSize];
202 memset(header, '\0', sizeof(header));
203 if (write(swap, header, sizeof(header)) != sizeof(header)) {
204 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
205 "; write error: " << xstrerror());
206 fatal("Rock Store db initialization error");
207 }
208 close(swap);
209 #endif
210 }
211
212 void
213 Rock::SwapDir::init()
214 {
215 debugs(47,2, HERE);
216
217 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
218 // are refcounted. We up our count once to avoid implicit delete's.
219 RefCountReference();
220
221 Must(!map);
222 map = new DirMap(path);
223
224 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
225 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
226 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
227 io = m->createStrategy();
228 io->init();
229 } else {
230 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
231 ioModule);
232 fatal("Rock Store missing a required DiskIO module");
233 }
234
235 theFile = io->newFile(filePath);
236 theFile->configure(fileConfig);
237 theFile->open(O_RDWR, 0644, this);
238
239 // Increment early. Otherwise, if one SwapDir finishes rebuild before
240 // others start, storeRebuildComplete() will think the rebuild is over!
241 // TODO: move store_dirs_rebuilding hack to store modules that need it.
242 ++StoreController::store_dirs_rebuilding;
243 }
244
245 bool
246 Rock::SwapDir::needsDiskStrand() const
247 {
248 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
249 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
250 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
251 wouldWorkBetterWithDisker);
252 }
253
254 void
255 Rock::SwapDir::parse(int anIndex, char *aPath)
256 {
257 index = anIndex;
258
259 path = xstrdup(aPath);
260
261 // cache store is located at path/db
262 String fname(path);
263 fname.append("/rock");
264 filePath = xstrdup(fname.termedBuf());
265
266 parseSize(false);
267 parseOptions(0);
268
269 // Current openForWriting() code overwrites the old slot if needed
270 // and possible, so proactively removing old slots is probably useless.
271 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
272
273 validateOptions();
274 }
275
276 void
277 Rock::SwapDir::reconfigure()
278 {
279 parseSize(true);
280 parseOptions(1);
281 // TODO: can we reconfigure the replacement policy (repl)?
282 validateOptions();
283 }
284
285 /// parse maximum db disk size
286 void
287 Rock::SwapDir::parseSize(const bool reconfiguring)
288 {
289 const int i = GetInteger();
290 if (i < 0)
291 fatal("negative Rock cache_dir size value");
292 const uint64_t new_max_size =
293 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
294 if (!reconfiguring)
295 max_size = new_max_size;
296 else if (new_max_size != max_size) {
297 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
298 "cannot be changed dynamically, value left unchanged (" <<
299 (max_size >> 20) << " MB)");
300 }
301 }
302
303 ConfigOption *
304 Rock::SwapDir::getOptionTree() const
305 {
306 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
307 assert(vector);
308 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
309 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
310 return vector;
311 }
312
313 bool
314 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
315 {
316 return strcmp(option, "max-size") != 0 &&
317 ::SwapDir::allowOptionReconfigure(option);
318 }
319
320 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
321 bool
322 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfiguring)
323 {
324 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
325 // including time unit handling. Same for size.
326
327 time_msec_t *storedTime;
328 if (strcmp(option, "swap-timeout") == 0)
329 storedTime = &fileConfig.ioTimeout;
330 else
331 return false;
332
333 if (!value)
334 self_destruct();
335
336 // TODO: handle time units and detect parsing errors better
337 const int64_t parsedValue = strtoll(value, NULL, 10);
338 if (parsedValue < 0) {
339 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
340 self_destruct();
341 }
342
343 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
344
345 if (!reconfiguring)
346 *storedTime = newTime;
347 else if (*storedTime != newTime) {
348 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
349 << " cannot be changed dynamically, value left unchanged: " <<
350 *storedTime);
351 }
352
353 return true;
354 }
355
356 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
357 void
358 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
359 {
360 if (fileConfig.ioTimeout)
361 storeAppendPrintf(e, " swap-timeout=%" PRId64,
362 static_cast<int64_t>(fileConfig.ioTimeout));
363 }
364
365 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
366 bool
367 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
368 {
369 int *storedRate;
370 if (strcmp(option, "max-swap-rate") == 0)
371 storedRate = &fileConfig.ioRate;
372 else
373 return false;
374
375 if (!value)
376 self_destruct();
377
378 // TODO: handle time units and detect parsing errors better
379 const int64_t parsedValue = strtoll(value, NULL, 10);
380 if (parsedValue < 0) {
381 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
382 self_destruct();
383 }
384
385 const int newRate = static_cast<int>(parsedValue);
386
387 if (newRate < 0) {
388 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
389 self_destruct();
390 }
391
392 if (!isaReconfig)
393 *storedRate = newRate;
394 else if (*storedRate != newRate) {
395 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
396 << " cannot be changed dynamically, value left unchanged: " <<
397 *storedRate);
398 }
399
400 return true;
401 }
402
403 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
404 void
405 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
406 {
407 if (fileConfig.ioRate >= 0)
408 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
409 }
410
411 /// check the results of the configuration; only level-0 debugging works here
412 void
413 Rock::SwapDir::validateOptions()
414 {
415 if (max_objsize <= 0)
416 fatal("Rock store requires a positive max-size");
417
418 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
419 const int64_t maxObjectSizeRoundingWaste = maxObjectSize();
420 const int64_t maxRoundingWaste =
421 max(maxSizeRoundingWaste, maxObjectSizeRoundingWaste);
422 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
423 const int64_t diskWasteSize = maxSize() - usableDiskSize;
424 Must(diskWasteSize >= 0);
425
426 // warn if maximum db size is not reachable due to sfileno limit
427 if (entryLimitAllowed() == entryLimitHigh() &&
428 diskWasteSize >= maxRoundingWaste) {
429 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
430 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
431 debugs(47, DBG_CRITICAL, "\tmaximum object size: " << maxObjectSize() << " Bytes");
432 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
433 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
434 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
435 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
436 }
437 }
438
439 void
440 Rock::SwapDir::rebuild()
441 {
442 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
443 AsyncJob::Start(new Rebuild(this));
444 }
445
446 /* Add a new object to the cache with empty memory copy and pointer to disk
447 * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
448 bool
449 Rock::SwapDir::addEntry(const int filen, const DbCellHeader &header, const StoreEntry &from)
450 {
451 debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
452 ", filen="<< std::setfill('0') << std::hex << std::uppercase <<
453 std::setw(8) << filen);
454
455 sfileno newLocation = 0;
456 if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
457 if (filen == newLocation) {
458 slot->set(from);
459 map->extras(filen) = header;
460 } // else some other, newer entry got into our cell
461 map->closeForWriting(newLocation, false);
462 return filen == newLocation;
463 }
464
465 return false;
466 }
467
468 bool
469 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
470 {
471 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
472 return false;
473
474 if (!theFile || !theFile->canWrite())
475 return false;
476
477 if (!map)
478 return false;
479
480 // Do not start I/O transaction if there are less than 10% free pages left.
481 // TODO: reserve page instead
482 if (needsDiskStrand() &&
483 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
484 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
485 return false;
486 }
487
488 if (io->shedLoad())
489 return false;
490
491 load = io->load();
492 return true;
493 }
494
495 StoreIOState::Pointer
496 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
497 {
498 if (!theFile || theFile->error()) {
499 debugs(47,4, HERE << theFile);
500 return NULL;
501 }
502
503 // compute payload size for our cell header, using StoreEntry info
504 // careful: e.objectLen() may still be negative here
505 const int64_t expectedReplySize = e.mem_obj->expectedReplySize();
506 assert(expectedReplySize >= 0); // must know to prevent cell overflows
507 assert(e.mem_obj->swap_hdr_sz > 0);
508 DbCellHeader header;
509 header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize;
510 const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize;
511 assert(payloadEnd <= max_objsize);
512
513 sfileno filen;
514 Ipc::StoreMapSlot *const slot =
515 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
516 if (!slot) {
517 debugs(47, 5, HERE << "map->add failed");
518 return NULL;
519 }
520 e.swap_file_sz = header.payloadSize; // and will be copied to the map
521 slot->set(e);
522 map->extras(filen) = header;
523
524 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
525 // If that does not happen, the entry will not decrement the read level!
526
527 IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
528
529 sio->swap_dirn = index;
530 sio->swap_filen = filen;
531 sio->payloadEnd = payloadEnd;
532 sio->diskOffset = diskOffset(sio->swap_filen);
533
534 debugs(47,5, HERE << "dir " << index << " created new filen " <<
535 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
536 sio->swap_filen << std::dec << " at " << sio->diskOffset);
537
538 assert(sio->diskOffset + payloadEnd <= diskOffsetLimit());
539
540 sio->file(theFile);
541
542 trackReferences(e);
543 return sio;
544 }
545
546 int64_t
547 Rock::SwapDir::diskOffset(int filen) const
548 {
549 assert(filen >= 0);
550 return HeaderSize + max_objsize*filen;
551 }
552
553 int64_t
554 Rock::SwapDir::diskOffsetLimit() const
555 {
556 assert(map);
557 return diskOffset(map->entryLimit());
558 }
559
560 // tries to open an old or being-written-to entry with swap_filen for reading
561 StoreIOState::Pointer
562 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
563 {
564 if (!theFile || theFile->error()) {
565 debugs(47,4, HERE << theFile);
566 return NULL;
567 }
568
569 if (e.swap_filen < 0) {
570 debugs(47,4, HERE << e);
571 return NULL;
572 }
573
574 // Do not start I/O transaction if there are less than 10% free pages left.
575 // TODO: reserve page instead
576 if (needsDiskStrand() &&
577 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
578 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
579 return NULL;
580 }
581
582 // The are two ways an entry can get swap_filen: our get() locked it for
583 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
584 // locked entry is safe, but no support for reading a filling entry.
585 const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
586 if (!slot)
587 return NULL; // we were writing afterall
588
589 IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
590
591 sio->swap_dirn = index;
592 sio->swap_filen = e.swap_filen;
593 sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize;
594 assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
595
596 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
597 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
598 sio->swap_filen);
599
600 assert(slot->basics.swap_file_sz > 0);
601 assert(slot->basics.swap_file_sz == e.swap_file_sz);
602
603 sio->diskOffset = diskOffset(sio->swap_filen);
604 assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
605
606 sio->file(theFile);
607 return sio;
608 }
609
610 void
611 Rock::SwapDir::ioCompletedNotification()
612 {
613 if (!theFile)
614 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
615
616 if (theFile->error())
617 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
618 xstrerror());
619
620 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
621 std::setw(12) << maxSize() << " disk bytes and " <<
622 std::setw(7) << map->entryLimit() << " entries");
623
624 rebuild();
625 }
626
627 void
628 Rock::SwapDir::closeCompleted()
629 {
630 theFile = NULL;
631 }
632
633 void
634 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
635 {
636 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
637 assert(request);
638 IoState::Pointer sio = request->sio;
639
640 if (errflag == DISK_OK && rlen > 0)
641 sio->offset_ += rlen;
642 assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum
643
644 StoreIOState::STRCB *callback = sio->read.callback;
645 assert(callback);
646 sio->read.callback = NULL;
647 void *cbdata;
648 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
649 callback(cbdata, r->buf, rlen, sio.getRaw());
650 }
651
652 void
653 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
654 {
655 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
656 assert(request);
657 assert(request->sio != NULL);
658 IoState &sio = *request->sio;
659
660 if (errflag == DISK_OK) {
661 // close, assuming we only write once; the entry gets the read lock
662 map->closeForWriting(sio.swap_filen, true);
663 // do not increment sio.offset_ because we do it in sio->write()
664 } else {
665 // Do not abortWriting here. The entry should keep the write lock
666 // instead of losing association with the store and confusing core.
667 map->free(sio.swap_filen); // will mark as unusable, just in case
668 }
669
670 assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum
671
672 sio.finishedWriting(errflag);
673 }
674
675 bool
676 Rock::SwapDir::full() const
677 {
678 return map && map->full();
679 }
680
681 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
682 // but it should not happen for us
683 void
684 Rock::SwapDir::diskFull()
685 {
686 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
687 filePath);
688 }
689
690 /// purge while full(); it should be sufficient to purge just one
691 void
692 Rock::SwapDir::maintain()
693 {
694 debugs(47,3, HERE << "cache_dir[" << index << "] guards: " <<
695 !repl << !map << !full() << StoreController::store_dirs_rebuilding);
696
697 if (!repl)
698 return; // no means (cannot find a victim)
699
700 if (!map)
701 return; // no victims (yet)
702
703 if (!full())
704 return; // no need (to find a victim)
705
706 // XXX: UFSSwapDir::maintain says we must quit during rebuild
707 if (StoreController::store_dirs_rebuilding)
708 return;
709
710 debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() <<
711 ' ' << currentSize() << " < " << diskOffsetLimit());
712
713 // Hopefully, we find a removable entry much sooner (TODO: use time?)
714 const int maxProbed = 10000;
715 RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed);
716
717 // It really should not take that long, but this will stop "infinite" loops
718 const int maxFreed = 1000;
719 int freed = 0;
720 // TODO: should we purge more than needed to minimize overheads?
721 for (; freed < maxFreed && full(); ++freed) {
722 if (StoreEntry *e = walker->Next(walker))
723 e->release(); // will call our unlink() method
724 else
725 break; // no more objects
726 }
727
728 debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed <<
729 " scanned " << walker->scanned << '/' << walker->locked);
730
731 walker->Done(walker);
732
733 if (full()) {
734 debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir[" << index << "] " <<
735 "is still full after freeing " << freed << " entries. A bug?");
736 }
737 }
738
739 void
740 Rock::SwapDir::reference(StoreEntry &e)
741 {
742 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
743 if (repl && repl->Referenced)
744 repl->Referenced(repl, &e, &e.repl);
745 }
746
747 bool
748 Rock::SwapDir::dereference(StoreEntry &e)
749 {
750 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
751 if (repl && repl->Dereferenced)
752 repl->Dereferenced(repl, &e, &e.repl);
753
754 // no need to keep e in the global store_table for us; we have our own map
755 return false;
756 }
757
758 bool
759 Rock::SwapDir::unlinkdUseful() const
760 {
761 // no entry-specific files to unlink
762 return false;
763 }
764
765 void
766 Rock::SwapDir::unlink(StoreEntry &e)
767 {
768 debugs(47, 5, HERE << e);
769 ignoreReferences(e);
770 map->free(e.swap_filen);
771 disconnect(e);
772 }
773
774 void
775 Rock::SwapDir::trackReferences(StoreEntry &e)
776 {
777 debugs(47, 5, HERE << e);
778 if (repl)
779 repl->Add(repl, &e, &e.repl);
780 }
781
782 void
783 Rock::SwapDir::ignoreReferences(StoreEntry &e)
784 {
785 debugs(47, 5, HERE << e);
786 if (repl)
787 repl->Remove(repl, &e, &e.repl);
788 }
789
790 void
791 Rock::SwapDir::statfs(StoreEntry &e) const
792 {
793 storeAppendPrintf(&e, "\n");
794 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
795 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
796 currentSize() / 1024.0,
797 Math::doublePercent(currentSize(), maxSize()));
798
799 if (map) {
800 const int limit = map->entryLimit();
801 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
802 if (limit > 0) {
803 const int entryCount = map->entryCount();
804 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
805 entryCount, (100.0 * entryCount / limit));
806
807 if (limit < 100) { // XXX: otherwise too expensive to count
808 Ipc::ReadWriteLockStats stats;
809 map->updateStats(stats);
810 stats.dump(e);
811 }
812 }
813 }
814
815 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
816 store_open_disk_fd, Config.max_open_disk_fds);
817
818 storeAppendPrintf(&e, "Flags:");
819
820 if (flags.selected)
821 storeAppendPrintf(&e, " SELECTED");
822
823 if (flags.read_only)
824 storeAppendPrintf(&e, " READ-ONLY");
825
826 storeAppendPrintf(&e, "\n");
827
828 }
829
830 namespace Rock
831 {
832 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
833 }
834
835 void Rock::SwapDirRr::create(const RunnerRegistry &)
836 {
837 Must(owners.empty());
838 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
839 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
840 Rock::SwapDir::DirMap::Owner *const owner =
841 Rock::SwapDir::DirMap::Init(sd->path, sd->entryLimitAllowed());
842 owners.push_back(owner);
843 }
844 }
845 }
846
847 Rock::SwapDirRr::~SwapDirRr()
848 {
849 for (size_t i = 0; i < owners.size(); ++i)
850 delete owners[i];
851 }