]> git.ipfire.org Git - thirdparty/squid.git/blob - src/Transients.cc
Polishing touches to address PREVIEW review concerns dated 2013/07/03.
[thirdparty/squid.git] / src / Transients.cc
1 /*
2 * DEBUG: section 20 Storage Manager
3 *
4 */
5
6 #include "squid.h"
7 #include "base/RunnersRegistry.h"
8 #include "CollapsedForwarding.h"
9 #include "HttpReply.h"
10 #include "ipc/mem/Page.h"
11 #include "ipc/mem/Pages.h"
12 #include "MemObject.h"
13 #include "mime_header.h"
14 #include "SquidConfig.h"
15 #include "SquidMath.h"
16 #include "StoreStats.h"
17 #include "tools.h"
18 #include "Transients.h"
19
20 #if HAVE_LIMITS_H
21 #include <limits>
22 #endif
23
24 /// shared memory segment path to use for Transients maps
25 static const char *MapLabel = "transients_map";
26
27 Transients::Transients(): map(NULL), locals(NULL)
28 {
29 }
30
31 Transients::~Transients()
32 {
33 delete map;
34 delete locals;
35 }
36
37 void
38 Transients::init()
39 {
40 const int64_t entryLimit = EntryLimit();
41 if (entryLimit <= 0)
42 return; // no SMP support or a misconfiguration
43
44 Must(!map);
45 map = new TransientsMap(MapLabel);
46 map->cleaner = this;
47
48 locals = new Locals(entryLimit, 0);
49 }
50
51 void
52 Transients::getStats(StoreInfoStats &stats) const
53 {
54 #if TRANSIENT_STATS_SUPPORTED
55 const size_t pageSize = Ipc::Mem::PageSize();
56
57 stats.mem.shared = true;
58 stats.mem.capacity =
59 Ipc::Mem::PageLimit(Ipc::Mem::PageId::cachePage) * pageSize;
60 stats.mem.size =
61 Ipc::Mem::PageLevel(Ipc::Mem::PageId::cachePage) * pageSize;
62 stats.mem.count = currentCount();
63 #endif
64 }
65
66 void
67 Transients::stat(StoreEntry &e) const
68 {
69 storeAppendPrintf(&e, "\n\nTransient Objects\n");
70
71 storeAppendPrintf(&e, "Maximum Size: %.0f KB\n", maxSize()/1024.0);
72 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
73 currentSize() / 1024.0,
74 Math::doublePercent(currentSize(), maxSize()));
75
76 if (map) {
77 const int limit = map->entryLimit();
78 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
79 if (limit > 0) {
80 storeAppendPrintf(&e, "Current entries: %" PRId64 " %.2f%%\n",
81 currentCount(), (100.0 * currentCount() / limit));
82 }
83 }
84 }
85
86 void
87 Transients::maintain()
88 {
89 // no lazy garbage collection needed
90 }
91
92 uint64_t
93 Transients::minSize() const
94 {
95 return 0; // XXX: irrelevant, but Store parent forces us to implement this
96 }
97
98 uint64_t
99 Transients::maxSize() const
100 {
101 // Squid currently does not limit the total size of all transient objects
102 return std::numeric_limits<uint64_t>::max();
103 }
104
105 uint64_t
106 Transients::currentSize() const
107 {
108 // TODO: we do not get enough information to calculate this
109 // StoreEntry should update associated stores when its size changes
110 return 0;
111 }
112
113 uint64_t
114 Transients::currentCount() const
115 {
116 return map ? map->entryCount() : 0;
117 }
118
119 int64_t
120 Transients::maxObjectSize() const
121 {
122 // Squid currently does not limit the size of a transient object
123 return std::numeric_limits<uint64_t>::max();
124 }
125
126 void
127 Transients::reference(StoreEntry &)
128 {
129 // no replacement policy (but the cache(s) storing the entry may have one)
130 }
131
132 bool
133 Transients::dereference(StoreEntry &, bool)
134 {
135 // no need to keep e in the global store_table for us; we have our own map
136 return false;
137 }
138
139 int
140 Transients::callback()
141 {
142 return 0;
143 }
144
145 StoreSearch *
146 Transients::search(String const, HttpRequest *)
147 {
148 fatal("not implemented");
149 return NULL;
150 }
151
152 StoreEntry *
153 Transients::get(const cache_key *key)
154 {
155 if (!map)
156 return NULL;
157
158 sfileno index;
159 const Ipc::StoreMapAnchor *anchor = map->openForReading(key, index);
160 if (!anchor)
161 return NULL;
162
163 // If we already have a local entry, the store_table should have found it.
164 // Since it did not, the local entry key must have changed from public to
165 // private. We still need to keep the private entry around for syncing as
166 // its clients depend on it, but we should not allow new clients to join.
167 if (StoreEntry *oldE = locals->at(index)) {
168 debugs(20, 3, "not joining private " << *oldE);
169 assert(EBIT_TEST(oldE->flags, KEY_PRIVATE));
170 } else if (StoreEntry *newE = copyFromShm(index)) {
171 return newE; // keep read lock to receive updates from others
172 }
173
174 // private entry or loading failure
175 map->closeForReading(index);
176 return NULL;
177 }
178
179 StoreEntry *
180 Transients::copyFromShm(const sfileno index)
181 {
182 const TransientsMap::Extras &extras = map->extras(index);
183
184 // create a brand new store entry and initialize it with stored info
185 StoreEntry *e = storeCreatePureEntry(extras.url, extras.url,
186 extras.reqFlags, extras.reqMethod);
187
188 assert(e->mem_obj);
189 e->mem_obj->method = extras.reqMethod;
190 e->mem_obj->xitTable.io = MemObject::ioReading;
191 e->mem_obj->xitTable.index = index;
192
193 e->setPublicKey();
194 assert(e->key);
195
196 // How do we know its SMP- and not just locally-collapsed? A worker gets
197 // locally-collapsed entries from the local store_table, not Transients.
198 // TODO: Can we remove smpCollapsed by not syncing non-transient entries?
199 e->mem_obj->smpCollapsed = true;
200
201 assert(!locals->at(index));
202 // We do not lock e because we do not want to prevent its destruction;
203 // e is tied to us via mem_obj so we will know when it is destructed.
204 locals->at(index) = e;
205 return e;
206 }
207
208 void
209 Transients::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData)
210 {
211 // XXX: not needed but Store parent forces us to implement this
212 fatal("Transients::get(key,callback,data) should not be called");
213 }
214
215 StoreEntry *
216 Transients::findCollapsed(const sfileno index)
217 {
218 if (!map)
219 return NULL;
220
221 if (StoreEntry *oldE = locals->at(index)) {
222 debugs(20, 5, "found " << *oldE << " at " << index << " in " << MapLabel);
223 assert(oldE->mem_obj && oldE->mem_obj->xitTable.index == index);
224 return oldE;
225 }
226
227 debugs(20, 3, "no entry at " << index << " in " << MapLabel);
228 return NULL;
229 }
230
231 void
232 Transients::startWriting(StoreEntry *e, const RequestFlags &reqFlags,
233 const HttpRequestMethod &reqMethod)
234 {
235 assert(e);
236 assert(e->mem_obj);
237 assert(e->mem_obj->xitTable.index < 0);
238
239 if (!map) {
240 debugs(20, 5, "No map to add " << *e);
241 return;
242 }
243
244 sfileno index = 0;
245 Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e->key), index);
246 if (!slot) {
247 debugs(20, 5, "collision registering " << *e);
248 return;
249 }
250
251 try {
252 if (copyToShm(*e, index, reqFlags, reqMethod)) {
253 slot->set(*e);
254 e->mem_obj->xitTable.io = MemObject::ioWriting;
255 e->mem_obj->xitTable.index = index;
256 map->startAppending(index);
257 // keep write lock -- we will be supplying others with updates
258 return;
259 }
260 // fall through to the error handling code
261 }
262 catch (const std::exception &x) { // TODO: should we catch ... as well?
263 debugs(20, 2, "error keeping entry " << index <<
264 ' ' << *e << ": " << x.what());
265 // fall through to the error handling code
266 }
267
268 map->abortWriting(index);
269 }
270
271 /// copies all relevant local data to shared memory
272 bool
273 Transients::copyToShm(const StoreEntry &e, const sfileno index,
274 const RequestFlags &reqFlags,
275 const HttpRequestMethod &reqMethod)
276 {
277 TransientsMap::Extras &extras = map->extras(index);
278
279 const char *url = e.url();
280 const size_t urlLen = strlen(url);
281 Must(urlLen < sizeof(extras.url)); // we have space to store it all, plus 0
282 strncpy(extras.url, url, sizeof(extras.url));
283 extras.url[urlLen] = '\0';
284
285 extras.reqFlags = reqFlags;
286
287 Must(reqMethod != Http::METHOD_OTHER);
288 extras.reqMethod = reqMethod.id();
289
290 return true;
291 }
292
293 void
294 Transients::noteFreeMapSlice(const sfileno sliceId)
295 {
296 // TODO: we should probably find the entry being deleted and abort it
297 }
298
299 void
300 Transients::abandon(const StoreEntry &e)
301 {
302 assert(e.mem_obj && map);
303 map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry
304 CollapsedForwarding::Broadcast(e);
305 // We do not unlock the entry now because the problem is most likely with
306 // the server resource rather than a specific cache writer, so we want to
307 // prevent other readers from collapsing requests for that resource.
308 }
309
310 bool
311 Transients::abandoned(const StoreEntry &e) const
312 {
313 assert(e.mem_obj);
314 return abandonedAt(e.mem_obj->xitTable.index);
315 }
316
317 /// whether an in-transit entry at the index is now abandoned by its writer
318 bool
319 Transients::abandonedAt(const sfileno index) const
320 {
321 assert(map);
322 return map->readableEntry(index).waitingToBeFreed;
323 }
324
325 void
326 Transients::completeWriting(const StoreEntry &e)
327 {
328 if (e.mem_obj && e.mem_obj->xitTable.index >= 0) {
329 assert(e.mem_obj->xitTable.io == MemObject::ioWriting);
330 // there will be no more updates from us after this, so we must prevent
331 // future readers from joining
332 map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry
333 map->closeForWriting(e.mem_obj->xitTable.index);
334 e.mem_obj->xitTable.index = -1;
335 e.mem_obj->xitTable.io = MemObject::ioDone;
336 }
337 }
338
339 int
340 Transients::readers(const StoreEntry &e) const
341 {
342 if (e.mem_obj && e.mem_obj->xitTable.index >= 0) {
343 assert(map);
344 return map->peekAtEntry(e.mem_obj->xitTable.index).lock.readers;
345 }
346 return 0;
347 }
348
349 void
350 Transients::markForUnlink(StoreEntry &e)
351 {
352 if (e.mem_obj && e.mem_obj->xitTable.io == MemObject::ioWriting)
353 abandon(e);
354 }
355
356 void
357 Transients::disconnect(MemObject &mem_obj)
358 {
359 if (mem_obj.xitTable.index >= 0) {
360 assert(map);
361 if (mem_obj.xitTable.io == MemObject::ioWriting) {
362 map->abortWriting(mem_obj.xitTable.index);
363 } else {
364 assert(mem_obj.xitTable.io == MemObject::ioReading);
365 map->closeForReading(mem_obj.xitTable.index);
366 }
367 locals->at(mem_obj.xitTable.index) = NULL;
368 mem_obj.xitTable.index = -1;
369 mem_obj.xitTable.io = MemObject::ioDone;
370 }
371 }
372
373 /// calculates maximum number of entries we need to store and map
374 int64_t
375 Transients::EntryLimit()
376 {
377 // TODO: we should also check whether any SMP-aware caching is configured
378 if (!UsingSmp() || !Config.onoff.collapsed_forwarding)
379 return 0; // no SMP collapsed forwarding possible or needed
380
381 return 16*1024; // TODO: make configurable?
382 }
383
384 /// initializes shared memory segment used by Transients
385 class TransientsRr: public Ipc::Mem::RegisteredRunner
386 {
387 public:
388 /* RegisteredRunner API */
389 TransientsRr(): mapOwner(NULL) {}
390 virtual void run(const RunnerRegistry &);
391 virtual ~TransientsRr();
392
393 protected:
394 virtual void create(const RunnerRegistry &);
395
396 private:
397 TransientsMap::Owner *mapOwner;
398 };
399
400 RunnerRegistrationEntry(rrAfterConfig, TransientsRr);
401
402 void
403 TransientsRr::run(const RunnerRegistry &r)
404 {
405 assert(Config.memShared.configured());
406 Ipc::Mem::RegisteredRunner::run(r);
407 }
408
409 void
410 TransientsRr::create(const RunnerRegistry &)
411 {
412 if (!Config.onoff.collapsed_forwarding)
413 return;
414
415 const int64_t entryLimit = Transients::EntryLimit();
416 if (entryLimit <= 0)
417 return; // no SMP configured or a misconfiguration
418
419 Must(!mapOwner);
420 mapOwner = TransientsMap::Init(MapLabel, entryLimit);
421 }
422
423 TransientsRr::~TransientsRr()
424 {
425 delete mapOwner;
426 }