]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockRebuild.cc
Initial Large Rock implmentation.
[thirdparty/squid.git] / src / fs / rock / RockRebuild.cc
1 /*
2 * DEBUG: section 79 Disk IO Routines
3 */
4
5 #include "squid.h"
6 #include "disk.h"
7 #include "fs/rock/RockRebuild.h"
8 #include "fs/rock/RockSwapDir.h"
9 #include "fs/rock/RockDbCell.h"
10 #include "ipc/StoreMap.h"
11 #include "globals.h"
12 #include "md5.h"
13 #include "tools.h"
14 #include "typedefs.h"
15 #include "SquidTime.h"
16 #include "store_rebuild.h"
17
18 #if HAVE_ERRNO_H
19 #include <errno.h>
20 #endif
21
22 CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
23
24 Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
25 sd(dir),
26 dbSize(0),
27 dbEntrySize(0),
28 dbEntryLimit(0),
29 dbSlot(0),
30 fd(-1),
31 dbOffset(0),
32 filen(0)
33 {
34 assert(sd);
35 memset(&counts, 0, sizeof(counts));
36 dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
37 dbEntrySize = sd->max_objsize;
38 dbEntryLimit = sd->entryLimit();
39 loaded.reserve(dbSize);
40 for (size_t i = 0; i < loaded.size(); ++i)
41 loaded.push_back(false);
42 }
43
44 Rock::Rebuild::~Rebuild()
45 {
46 if (fd >= 0)
47 file_close(fd);
48 }
49
50 /// prepares and initiates entry loading sequence
51 void
52 Rock::Rebuild::start()
53 {
54 // in SMP mode, only the disker is responsible for populating the map
55 if (UsingSmp() && !IamDiskProcess()) {
56 debugs(47, 2, "Non-disker skips rebuilding of cache_dir #" <<
57 sd->index << " from " << sd->filePath);
58 mustStop("non-disker");
59 return;
60 }
61
62 debugs(47, DBG_IMPORTANT, "Loading cache_dir #" << sd->index <<
63 " from " << sd->filePath);
64
65 fd = file_open(sd->filePath, O_RDONLY | O_BINARY);
66 if (fd < 0)
67 failure("cannot open db", errno);
68
69 char buf[SwapDir::HeaderSize];
70 if (read(fd, buf, sizeof(buf)) != SwapDir::HeaderSize)
71 failure("cannot read db header", errno);
72
73 dbOffset = SwapDir::HeaderSize;
74 filen = 0;
75
76 checkpoint();
77 }
78
79 /// continues after a pause if not done
80 void
81 Rock::Rebuild::checkpoint()
82 {
83 if (dbOffset < dbSize)
84 eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
85 else
86 if (!doneAll()) {
87 eventAdd("Rock::Rebuild::Step2", Rock::Rebuild::Steps2, this, 0.01, 1,
88 true);
89 }
90 }
91
92 bool
93 Rock::Rebuild::doneAll() const
94 {
95 return dbSlot >= dbSize && AsyncJob::doneAll();
96 }
97
98 void
99 Rock::Rebuild::Steps(void *data)
100 {
101 // use async call to enable job call protection that time events lack
102 CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps);
103 }
104
105 void
106 Rock::Rebuild::Steps2(void *data)
107 {
108 // use async call to enable job call protection that time events lack
109 CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps2);
110 }
111
112 void
113 Rock::Rebuild::steps()
114 {
115 debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
116 dbOffset << " <= " << dbSize);
117
118 // Balance our desire to maximize the number of entries processed at once
119 // (and, hence, minimize overheads and total rebuild time) with a
120 // requirement to also process Coordinator events, disk I/Os, etc.
121 const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
122 const timeval loopStart = current_time;
123
124 int loaded = 0;
125 while (loaded < dbEntryLimit && dbOffset < dbSize) {
126 doOneEntry();
127 dbOffset += dbEntrySize;
128 ++filen;
129 ++loaded;
130
131 if (counts.scancount % 1000 == 0)
132 storeRebuildProgress(sd->index, dbEntryLimit, counts.scancount);
133
134 if (opt_foreground_rebuild)
135 continue; // skip "few entries at a time" check below
136
137 getCurrentTime();
138 const double elapsedMsec = tvSubMsec(loopStart, current_time);
139 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
140 debugs(47, 5, HERE << "pausing after " << loaded << " entries in " <<
141 elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per entry");
142 break;
143 }
144 }
145
146 checkpoint();
147 }
148
149 void
150 Rock::Rebuild::steps2()
151 {
152 debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
153 dbSlot << " <= " << dbSize);
154
155 // Balance our desire to maximize the number of slots processed at once
156 // (and, hence, minimize overheads and total rebuild time) with a
157 // requirement to also process Coordinator events, disk I/Os, etc.
158 const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
159 const timeval loopStart = current_time;
160
161 int loaded = 0;
162 while (dbSlot < dbSize) {
163 doOneSlot();
164 ++dbSlot;
165 ++loaded;
166
167 if (opt_foreground_rebuild)
168 continue; // skip "few entries at a time" check below
169
170 getCurrentTime();
171 const double elapsedMsec = tvSubMsec(loopStart, current_time);
172 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
173 debugs(47, 5, HERE << "pausing after " << loaded << " slots in " <<
174 elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per slot");
175 break;
176 }
177 }
178
179 checkpoint();
180 }
181
182 void
183 Rock::Rebuild::doOneEntry()
184 {
185 debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
186 dbOffset << " <= " << dbSize);
187
188 ++counts.scancount;
189
190 if (lseek(fd, dbOffset, SEEK_SET) < 0)
191 failure("cannot seek to db entry", errno);
192
193 MemBuf buf;
194 buf.init(sizeof(DbCellHeader), sizeof(DbCellHeader));
195
196 if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
197 return;
198
199 // get our header
200 Ipc::Mem::PageId pageId;
201 pageId.pool = sd->index;
202 pageId.number = filen + 1;
203 DbCellHeader &header = sd->dbSlot(pageId);
204 assert(!header.sane());
205
206 if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
207 debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
208 "Ignoring truncated cache entry meta data at " << dbOffset);
209 invalidSlot(pageId);
210 return;
211 }
212 memcpy(&header, buf.content(), sizeof(header));
213
214 if (!header.sane()) {
215 debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
216 "Ignoring malformed cache entry meta data at " << dbOffset);
217 invalidSlot(pageId);
218 return;
219 }
220 }
221
222 void
223 Rock::Rebuild::doOneSlot()
224 {
225 debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
226 dbSlot << " <= " << dbSize);
227
228 if (loaded[dbSlot])
229 return;
230
231 Ipc::Mem::PageId pageId;
232 pageId.pool = sd->index;
233 pageId.number = dbSlot + 1;
234 const DbCellHeader &dbSlot = sd->dbSlot(pageId);
235 assert(dbSlot.sane());
236
237 pageId.number = dbSlot.firstSlot;
238 //const DbCellHeader &firstChainSlot = sd->dbSlot(pageId);
239
240 /* Process all not yet loaded slots, verify entry chains, if chain
241 is valid, load entry from first slot similar to small rock,
242 call SwapDir::addEntry (needs to be restored). */
243 }
244
245 void
246 Rock::Rebuild::swanSong()
247 {
248 debugs(47,3, HERE << "cache_dir #" << sd->index << " rebuild level: " <<
249 StoreController::store_dirs_rebuilding);
250 --StoreController::store_dirs_rebuilding;
251 storeRebuildComplete(&counts);
252 }
253
254 void
255 Rock::Rebuild::failure(const char *msg, int errNo)
256 {
257 debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
258 dbOffset << " <= " << dbSize);
259
260 if (errNo)
261 debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir rebuild failure: " << xstrerr(errNo));
262 debugs(47, DBG_CRITICAL, "Do you need to run 'squid -z' to initialize storage?");
263
264 assert(sd);
265 fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.",
266 sd->index, sd->filePath, msg);
267 }
268
269 void Rock::Rebuild::invalidSlot(Ipc::Mem::PageId &pageId)
270 {
271 ++counts.invalid;
272 loaded[pageId.number - 1] = true;
273 sd->dbSlotIndex->push(pageId);
274 }