]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Copyright (C) 1996-2025 The Squid Software Foundation and contributors | |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
9 | /* DEBUG: section 72 Peer Digest Routines */ | |
10 | ||
11 | #include "squid.h" | |
12 | #if USE_CACHE_DIGESTS | |
13 | #include "base/IoManip.h" | |
14 | #include "CacheDigest.h" | |
15 | #include "CachePeer.h" | |
16 | #include "event.h" | |
17 | #include "FwdState.h" | |
18 | #include "globals.h" | |
19 | #include "HttpReply.h" | |
20 | #include "HttpRequest.h" | |
21 | #include "internal.h" | |
22 | #include "MemObject.h" | |
23 | #include "mime_header.h" | |
24 | #include "neighbors.h" | |
25 | #include "PeerDigest.h" | |
26 | #include "Store.h" | |
27 | #include "store_key_md5.h" | |
28 | #include "StoreClient.h" | |
29 | #include "tools.h" | |
30 | #include "util.h" | |
31 | ||
32 | /* local types */ | |
33 | ||
34 | /* local prototypes */ | |
35 | static time_t peerDigestIncDelay(const PeerDigest * pd); | |
36 | static time_t peerDigestNewDelay(const StoreEntry * e); | |
37 | static void peerDigestSetCheck(PeerDigest * pd, time_t delay); | |
38 | static EVH peerDigestCheck; | |
39 | static void peerDigestRequest(PeerDigest * pd); | |
40 | static STCB peerDigestHandleReply; | |
41 | static int peerDigestFetchReply(void *, char *, ssize_t); | |
42 | int peerDigestSwapInCBlock(void *, char *, ssize_t); | |
43 | int peerDigestSwapInMask(void *, char *, ssize_t); | |
44 | static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name); | |
45 | static void finishAndDeleteFetch(DigestFetchState *, const char *reason, bool sawError); | |
46 | static void peerDigestFetchSetStats(DigestFetchState * fetch); | |
47 | static int peerDigestSetCBlock(PeerDigest * pd, const char *buf); | |
48 | static int peerDigestUseful(const PeerDigest * pd); | |
49 | ||
50 | /* local constants */ | |
51 | Version const CacheDigestVer = { 5, 3 }; | |
52 | ||
53 | #define StoreDigestCBlockSize sizeof(StoreDigestCBlock) | |
54 | ||
55 | /* min interval for requesting digests from a given peer */ | |
56 | static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */ | |
57 | /* min interval for requesting digests (cumulative request stream) */ | |
58 | static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */ | |
59 | ||
60 | /* local vars */ | |
61 | ||
62 | static time_t pd_last_req_time = 0; /* last call to Check */ | |
63 | ||
64 | PeerDigest::PeerDigest(CachePeer * const p): | |
65 | peer(p), | |
66 | host(peer->host) // if peer disappears, we will know its name | |
67 | { | |
68 | times.initialized = squid_curtime; | |
69 | } | |
70 | ||
71 | CBDATA_CLASS_INIT(PeerDigest); | |
72 | ||
73 | CBDATA_CLASS_INIT(DigestFetchState); | |
74 | ||
75 | DigestFetchState::DigestFetchState(PeerDigest *aPd, HttpRequest *req) : | |
76 | pd(aPd), | |
77 | entry(nullptr), | |
78 | old_entry(nullptr), | |
79 | sc(nullptr), | |
80 | old_sc(nullptr), | |
81 | request(req), | |
82 | offset(0), | |
83 | mask_offset(0), | |
84 | start_time(squid_curtime), | |
85 | resp_time(0), | |
86 | expires(0), | |
87 | bufofs(0), | |
88 | state(DIGEST_READ_REPLY) | |
89 | { | |
90 | HTTPMSGLOCK(request); | |
91 | ||
92 | sent.msg = 0; | |
93 | sent.bytes = 0; | |
94 | ||
95 | recv.msg = 0; | |
96 | recv.bytes = 0; | |
97 | ||
98 | *buf = 0; | |
99 | } | |
100 | ||
101 | DigestFetchState::~DigestFetchState() | |
102 | { | |
103 | if (old_entry) { | |
104 | debugs(72, 3, "deleting old entry"); | |
105 | storeUnregister(old_sc, old_entry, this); | |
106 | old_entry->releaseRequest(); | |
107 | old_entry->unlock("DigestFetchState destructed old"); | |
108 | old_entry = nullptr; | |
109 | } | |
110 | ||
111 | /* unlock everything */ | |
112 | storeUnregister(sc, entry, this); | |
113 | ||
114 | entry->unlock("DigestFetchState destructed"); | |
115 | entry = nullptr; | |
116 | ||
117 | HTTPMSGUNLOCK(request); | |
118 | } | |
119 | ||
120 | PeerDigest::~PeerDigest() | |
121 | { | |
122 | if (times.next_check && eventFind(peerDigestCheck, this)) | |
123 | eventDelete(peerDigestCheck, this); | |
124 | delete cd; | |
125 | // req_result pointer is not owned by us | |
126 | } | |
127 | ||
128 | /* called by peer to indicate that somebody actually needs this digest */ | |
129 | void | |
130 | peerDigestNeeded(PeerDigest * pd) | |
131 | { | |
132 | assert(pd); | |
133 | assert(!pd->flags.needed); | |
134 | assert(!pd->cd); | |
135 | ||
136 | pd->flags.needed = true; | |
137 | pd->times.needed = squid_curtime; | |
138 | peerDigestSetCheck(pd, 0); /* check asap */ | |
139 | } | |
140 | ||
141 | /* increment retry delay [after an unsuccessful attempt] */ | |
142 | static time_t | |
143 | peerDigestIncDelay(const PeerDigest * pd) | |
144 | { | |
145 | assert(pd); | |
146 | return pd->times.retry_delay > 0 ? | |
147 | 2 * pd->times.retry_delay : /* exponential backoff */ | |
148 | PeerDigestReqMinGap; /* minimal delay */ | |
149 | } | |
150 | ||
151 | /* artificially increases Expires: setting to avoid race conditions | |
152 | * returns the delay till that [increased] expiration time */ | |
153 | static time_t | |
154 | peerDigestNewDelay(const StoreEntry * e) | |
155 | { | |
156 | assert(e); | |
157 | ||
158 | if (e->expires > 0) | |
159 | return e->expires + PeerDigestReqMinGap - squid_curtime; | |
160 | ||
161 | return PeerDigestReqMinGap; | |
162 | } | |
163 | ||
164 | /* registers next digest verification */ | |
165 | static void | |
166 | peerDigestSetCheck(PeerDigest * pd, time_t delay) | |
167 | { | |
168 | eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1); | |
169 | pd->times.next_check = squid_curtime + delay; | |
170 | debugs(72, 3, "peerDigestSetCheck: will check peer " << pd->host << " in " << delay << " secs"); | |
171 | } | |
172 | ||
173 | /* callback for eventAdd() (with peer digest locked) | |
174 | * request new digest if our copy is too old or if we lack one; | |
175 | * schedule next check otherwise */ | |
176 | static void | |
177 | peerDigestCheck(void *data) | |
178 | { | |
179 | PeerDigest *pd = (PeerDigest *)data; | |
180 | time_t req_time; | |
181 | ||
182 | assert(!pd->flags.requested); | |
183 | ||
184 | pd->times.next_check = 0; /* unknown */ | |
185 | ||
186 | Assure(pd->peer.valid()); | |
187 | ||
188 | debugs(72, 3, "cache_peer " << RawPointer(pd->peer).orNil()); | |
189 | debugs(72, 3, "peerDigestCheck: time: " << squid_curtime << | |
190 | ", last received: " << (long int) pd->times.received << " (" << | |
191 | std::showpos << (int) (squid_curtime - pd->times.received) << ")"); | |
192 | ||
193 | /* decide when we should send the request: | |
194 | * request now unless too close to other requests */ | |
195 | req_time = squid_curtime; | |
196 | ||
197 | /* per-peer limit */ | |
198 | ||
199 | if (req_time - pd->times.received < PeerDigestReqMinGap) { | |
200 | debugs(72, 2, "peerDigestCheck: " << pd->host << | |
201 | ", avoiding close peer requests (" << | |
202 | (int) (req_time - pd->times.received) << " < " << | |
203 | (int) PeerDigestReqMinGap << " secs)."); | |
204 | ||
205 | req_time = pd->times.received + PeerDigestReqMinGap; | |
206 | } | |
207 | ||
208 | /* global limit */ | |
209 | if (req_time - pd_last_req_time < GlobDigestReqMinGap) { | |
210 | debugs(72, 2, "peerDigestCheck: " << pd->host << | |
211 | ", avoiding close requests (" << | |
212 | (int) (req_time - pd_last_req_time) << " < " << | |
213 | (int) GlobDigestReqMinGap << " secs)."); | |
214 | ||
215 | req_time = pd_last_req_time + GlobDigestReqMinGap; | |
216 | } | |
217 | ||
218 | if (req_time <= squid_curtime) | |
219 | peerDigestRequest(pd); /* will set pd->flags.requested */ | |
220 | else | |
221 | peerDigestSetCheck(pd, req_time - squid_curtime); | |
222 | } | |
223 | ||
224 | /* ask store for a digest */ | |
225 | static void | |
226 | peerDigestRequest(PeerDigest * pd) | |
227 | { | |
228 | const auto p = pd->peer.get(); // TODO: Replace with a reference. | |
229 | StoreEntry *e, *old_e; | |
230 | char *url = nullptr; | |
231 | HttpRequest *req; | |
232 | StoreIOBuffer tempBuffer; | |
233 | ||
234 | pd->req_result = nullptr; | |
235 | pd->flags.requested = true; | |
236 | ||
237 | /* compute future request components */ | |
238 | ||
239 | if (p->digest_url) | |
240 | url = xstrdup(p->digest_url); | |
241 | else | |
242 | url = xstrdup(internalRemoteUri(p->secure.encryptTransport, p->host, p->http_port, "/squid-internal-periodic/", SBuf(StoreDigestFileName))); | |
243 | debugs(72, 2, url); | |
244 | ||
245 | const auto mx = MasterXaction::MakePortless<XactionInitiator::initCacheDigest>(); | |
246 | req = HttpRequest::FromUrlXXX(url, mx); | |
247 | ||
248 | assert(req); | |
249 | ||
250 | /* add custom headers */ | |
251 | assert(!req->header.len); | |
252 | ||
253 | req->header.putStr(Http::HdrType::ACCEPT, StoreDigestMimeStr); | |
254 | ||
255 | req->header.putStr(Http::HdrType::ACCEPT, "text/html"); | |
256 | ||
257 | if (p->login && | |
258 | p->login[0] != '*' && | |
259 | strcmp(p->login, "PASS") != 0 && | |
260 | strcmp(p->login, "PASSTHRU") != 0 && | |
261 | strncmp(p->login, "NEGOTIATE",9) != 0 && | |
262 | strcmp(p->login, "PROXYPASS") != 0) { | |
263 | req->url.userInfo(SBuf(p->login)); // XXX: performance regression make peer login SBuf as well. | |
264 | } | |
265 | /* create fetch state structure */ | |
266 | DigestFetchState *fetch = new DigestFetchState(pd, req); | |
267 | ||
268 | /* update timestamps */ | |
269 | pd->times.requested = squid_curtime; | |
270 | pd_last_req_time = squid_curtime; | |
271 | req->flags.cachable.support(); // prevent RELEASE_REQUEST in storeCreateEntry() | |
272 | ||
273 | /* the rest is based on clientReplyContext::processExpired() */ | |
274 | req->flags.refresh = true; | |
275 | ||
276 | old_e = fetch->old_entry = storeGetPublicByRequest(req); | |
277 | ||
278 | // XXX: Missing a hittingRequiresCollapsing() && startCollapsingOn() check. | |
279 | if (old_e) { | |
280 | debugs(72, 5, "found old " << *old_e); | |
281 | ||
282 | old_e->lock("peerDigestRequest"); | |
283 | old_e->ensureMemObject(url, url, req->method); | |
284 | ||
285 | fetch->old_sc = storeClientListAdd(old_e, fetch); | |
286 | } | |
287 | ||
288 | e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method); | |
289 | debugs(72, 5, "created " << *e); | |
290 | assert(EBIT_TEST(e->flags, KEY_PRIVATE)); | |
291 | fetch->sc = storeClientListAdd(e, fetch); | |
292 | /* set lastmod to trigger IMS request if possible */ | |
293 | ||
294 | // TODO: Also check for fetch->pd->cd presence as a precondition for sending | |
295 | // IMS requests because peerDigestFetchReply() does not accept 304 responses | |
296 | // without an in-memory cache digest. | |
297 | if (old_e) | |
298 | e->lastModified(old_e->lastModified()); | |
299 | ||
300 | /* push towards peer cache */ | |
301 | FwdState::fwdStart(Comm::ConnectionPointer(), e, req); | |
302 | ||
303 | tempBuffer.offset = 0; | |
304 | ||
305 | tempBuffer.length = SM_PAGE_SIZE; | |
306 | ||
307 | tempBuffer.data = fetch->buf; | |
308 | ||
309 | storeClientCopy(fetch->sc, e, tempBuffer, | |
310 | peerDigestHandleReply, fetch); | |
311 | ||
312 | safe_free(url); | |
313 | } | |
314 | ||
315 | /* Handle the data copying .. */ | |
316 | ||
317 | /* | |
318 | * This routine handles the copy data and then redirects the | |
319 | * copy to a bunch of subfunctions depending upon the copy state. | |
320 | * It also tracks the buffer offset and "seen", since I'm actually | |
321 | * not interested in rewriting everything to suit my little idea. | |
322 | */ | |
323 | static void | |
324 | peerDigestHandleReply(void *data, StoreIOBuffer receivedData) | |
325 | { | |
326 | DigestFetchState *fetch = (DigestFetchState *)data; | |
327 | int retsize = -1; | |
328 | digest_read_state_t prevstate; | |
329 | int newsize; | |
330 | ||
331 | if (receivedData.flags.error) { | |
332 | finishAndDeleteFetch(fetch, "failure loading digest reply from Store", true); | |
333 | return; | |
334 | } | |
335 | ||
336 | if (!fetch->pd) { | |
337 | finishAndDeleteFetch(fetch, "digest disappeared while loading digest reply from Store", true); | |
338 | return; | |
339 | } | |
340 | ||
341 | /* The existing code assumes that the received pointer is | |
342 | * where we asked the data to be put | |
343 | */ | |
344 | assert(!receivedData.data || fetch->buf + fetch->bufofs == receivedData.data); | |
345 | ||
346 | /* Update the buffer size */ | |
347 | fetch->bufofs += receivedData.length; | |
348 | ||
349 | assert(fetch->bufofs <= SM_PAGE_SIZE); | |
350 | ||
351 | /* If we've fetched enough, return */ | |
352 | ||
353 | if (peerDigestFetchedEnough(fetch, fetch->buf, fetch->bufofs, "peerDigestHandleReply")) | |
354 | return; | |
355 | ||
356 | /* Call the right function based on the state */ | |
357 | /* (Those functions will update the state if needed) */ | |
358 | ||
359 | /* Give us a temporary reference. Some of the calls we make may | |
360 | * try to destroy the fetch structure, and we like to know if they | |
361 | * do | |
362 | */ | |
363 | CbcPointer<DigestFetchState> tmpLock = fetch; | |
364 | ||
365 | /* Repeat this loop until we're out of data OR the state changes */ | |
366 | /* (So keep going if the state has changed and we still have data */ | |
367 | do { | |
368 | prevstate = fetch->state; | |
369 | ||
370 | switch (fetch->state) { | |
371 | ||
372 | case DIGEST_READ_REPLY: | |
373 | retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs); | |
374 | break; | |
375 | ||
376 | case DIGEST_READ_CBLOCK: | |
377 | retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs); | |
378 | break; | |
379 | ||
380 | case DIGEST_READ_MASK: | |
381 | retsize = peerDigestSwapInMask(fetch, fetch->buf, fetch->bufofs); | |
382 | break; | |
383 | ||
384 | case DIGEST_READ_NONE: | |
385 | break; | |
386 | ||
387 | default: | |
388 | fatal("Bad digest transfer mode!\n"); | |
389 | } | |
390 | ||
391 | if (retsize < 0) | |
392 | return; | |
393 | ||
394 | /* | |
395 | * The returned size indicates how much of the buffer was read - | |
396 | * so move the remainder of the buffer to the beginning | |
397 | * and update the bufofs / bufsize | |
398 | */ | |
399 | newsize = fetch->bufofs - retsize; | |
400 | ||
401 | memmove(fetch->buf, fetch->buf + retsize, fetch->bufofs - newsize); | |
402 | ||
403 | fetch->bufofs = newsize; | |
404 | ||
405 | } while (cbdataReferenceValid(fetch) && prevstate != fetch->state && fetch->bufofs > 0); | |
406 | ||
407 | // Check for EOF here, thus giving the parser one extra run. We could avoid this overhead by | |
408 | // checking at the beginning of this function. However, in this case, we would have to require | |
409 | // that the parser does not regard EOF as a special condition (it is true now but may change | |
410 | // in the future). | |
411 | if (fetch->sc->atEof()) { | |
412 | finishAndDeleteFetch(fetch, "premature end of digest reply", true); | |
413 | return; | |
414 | } | |
415 | ||
416 | /* Update the copy offset */ | |
417 | fetch->offset += receivedData.length; | |
418 | ||
419 | /* Schedule another copy */ | |
420 | if (cbdataReferenceValid(fetch)) { | |
421 | StoreIOBuffer tempBuffer; | |
422 | tempBuffer.offset = fetch->offset; | |
423 | tempBuffer.length = SM_PAGE_SIZE - fetch->bufofs; | |
424 | tempBuffer.data = fetch->buf + fetch->bufofs; | |
425 | storeClientCopy(fetch->sc, fetch->entry, tempBuffer, | |
426 | peerDigestHandleReply, fetch); | |
427 | } | |
428 | } | |
429 | ||
430 | /// handle HTTP response headers in the initial storeClientCopy() response | |
431 | static int | |
432 | peerDigestFetchReply(void *data, char *buf, ssize_t size) | |
433 | { | |
434 | DigestFetchState *fetch = (DigestFetchState *)data; | |
435 | const auto pd = fetch->pd.get(); | |
436 | assert(pd && buf); | |
437 | assert(!fetch->offset); | |
438 | ||
439 | assert(fetch->state == DIGEST_READ_REPLY); | |
440 | ||
441 | if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply")) | |
442 | return -1; | |
443 | ||
444 | { | |
445 | const auto &reply = fetch->entry->mem().freshestReply(); | |
446 | const auto status = reply.sline.status(); | |
447 | assert(status != Http::scNone); | |
448 | debugs(72, 3, "peerDigestFetchReply: " << pd->host << " status: " << status << | |
449 | ", expires: " << (long int) reply.expires << " (" << std::showpos << | |
450 | (int) (reply.expires - squid_curtime) << ")"); | |
451 | ||
452 | /* this "if" is based on clientHandleIMSReply() */ | |
453 | ||
454 | if (status == Http::scNotModified) { | |
455 | /* our old entry is fine */ | |
456 | assert(fetch->old_entry); | |
457 | ||
458 | if (!fetch->old_entry->mem_obj->request) | |
459 | fetch->old_entry->mem_obj->request = fetch->entry->mem_obj->request; | |
460 | ||
461 | assert(fetch->old_entry->mem_obj->request); | |
462 | ||
463 | if (!Store::Root().updateOnNotModified(fetch->old_entry, *fetch->entry)) { | |
464 | finishAndDeleteFetch(fetch, "header update failure after a 304 response", true); | |
465 | return -1; | |
466 | } | |
467 | ||
468 | /* get rid of 304 reply */ | |
469 | storeUnregister(fetch->sc, fetch->entry, fetch); | |
470 | ||
471 | fetch->entry->unlock("peerDigestFetchReply 304"); | |
472 | ||
473 | fetch->entry = fetch->old_entry; | |
474 | ||
475 | fetch->old_entry = nullptr; | |
476 | ||
477 | /* preserve request -- we need its size to update counters */ | |
478 | /* requestUnlink(r); */ | |
479 | /* fetch->entry->mem_obj->request = nullptr; */ | |
480 | ||
481 | if (!fetch->pd->cd) { | |
482 | finishAndDeleteFetch(fetch, "304 without the old in-memory digest", true); | |
483 | return -1; | |
484 | } | |
485 | ||
486 | // stay with the old in-memory digest | |
487 | finishAndDeleteFetch(fetch, "Not modified", false); | |
488 | return -1; | |
489 | } else if (status == Http::scOkay) { | |
490 | /* get rid of old entry if any */ | |
491 | ||
492 | if (fetch->old_entry) { | |
493 | debugs(72, 3, "peerDigestFetchReply: got new digest, releasing old one"); | |
494 | storeUnregister(fetch->old_sc, fetch->old_entry, fetch); | |
495 | fetch->old_entry->releaseRequest(); | |
496 | fetch->old_entry->unlock("peerDigestFetchReply 200"); | |
497 | fetch->old_entry = nullptr; | |
498 | } | |
499 | ||
500 | fetch->state = DIGEST_READ_CBLOCK; | |
501 | } else { | |
502 | /* some kind of a bug */ | |
503 | finishAndDeleteFetch(fetch, reply.sline.reason(), true); | |
504 | return -1; /* XXX -1 will abort stuff in ReadReply! */ | |
505 | } | |
506 | } | |
507 | ||
508 | return 0; // we consumed/used no buffered bytes | |
509 | } | |
510 | ||
511 | int | |
512 | peerDigestSwapInCBlock(void *data, char *buf, ssize_t size) | |
513 | { | |
514 | DigestFetchState *fetch = (DigestFetchState *)data; | |
515 | ||
516 | assert(fetch->state == DIGEST_READ_CBLOCK); | |
517 | ||
518 | if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock")) | |
519 | return -1; | |
520 | ||
521 | if (size >= (ssize_t)StoreDigestCBlockSize) { | |
522 | const auto pd = fetch->pd.get(); | |
523 | ||
524 | assert(pd); | |
525 | assert(fetch->entry->mem_obj); | |
526 | ||
527 | if (peerDigestSetCBlock(pd, buf)) { | |
528 | /* XXX: soon we will have variable header size */ | |
529 | /* switch to CD buffer and fetch digest guts */ | |
530 | buf = nullptr; | |
531 | assert(pd->cd->mask); | |
532 | fetch->state = DIGEST_READ_MASK; | |
533 | return StoreDigestCBlockSize; | |
534 | } else { | |
535 | finishAndDeleteFetch(fetch, "invalid digest cblock", true); | |
536 | return -1; | |
537 | } | |
538 | } | |
539 | ||
540 | /* need more data, do we have space? */ | |
541 | if (size >= SM_PAGE_SIZE) { | |
542 | finishAndDeleteFetch(fetch, "digest cblock too big", true); | |
543 | return -1; | |
544 | } | |
545 | ||
546 | return 0; /* We need more data */ | |
547 | } | |
548 | ||
549 | int | |
550 | peerDigestSwapInMask(void *data, char *buf, ssize_t size) | |
551 | { | |
552 | DigestFetchState *fetch = (DigestFetchState *)data; | |
553 | const auto pd = fetch->pd.get(); | |
554 | assert(pd); | |
555 | assert(pd->cd && pd->cd->mask); | |
556 | ||
557 | /* | |
558 | * NOTENOTENOTENOTENOTE: buf doesn't point to pd->cd->mask anymore! | |
559 | * we need to do the copy ourselves! | |
560 | */ | |
561 | memcpy(pd->cd->mask + fetch->mask_offset, buf, size); | |
562 | ||
563 | /* NOTE! buf points to the middle of pd->cd->mask! */ | |
564 | ||
565 | if (peerDigestFetchedEnough(fetch, nullptr, size, "peerDigestSwapInMask")) | |
566 | return -1; | |
567 | ||
568 | fetch->mask_offset += size; | |
569 | ||
570 | if (fetch->mask_offset >= pd->cd->mask_size) { | |
571 | debugs(72, 2, "peerDigestSwapInMask: Done! Got " << | |
572 | fetch->mask_offset << ", expected " << pd->cd->mask_size); | |
573 | assert(fetch->mask_offset == pd->cd->mask_size); | |
574 | assert(peerDigestFetchedEnough(fetch, nullptr, 0, "peerDigestSwapInMask")); | |
575 | return -1; /* XXX! */ | |
576 | } | |
577 | ||
578 | /* We always read everything, so return size */ | |
579 | return size; | |
580 | } | |
581 | ||
582 | static int | |
583 | peerDigestFetchedEnough(DigestFetchState * fetch, char *, ssize_t size, const char *step_name) | |
584 | { | |
585 | static const SBuf hostUnknown("<unknown>"); // peer host (if any) | |
586 | SBuf host = hostUnknown; | |
587 | ||
588 | const auto pd = fetch->pd.get(); | |
589 | Assure(pd); | |
590 | const char *reason = nullptr; /* reason for completion */ | |
591 | const char *no_bug = nullptr; /* successful completion if set */ | |
592 | ||
593 | /* test possible exiting conditions (the same for most steps!) | |
594 | * cases marked with '?!' should not happen */ | |
595 | ||
596 | if (!reason) { | |
597 | host = pd->host; | |
598 | } | |
599 | ||
600 | debugs(72, 6, step_name << ": peer " << host << ", offset: " << | |
601 | fetch->offset << " size: " << size << "."); | |
602 | ||
603 | /* continue checking (with pd and host known and valid) */ | |
604 | ||
605 | if (!reason) { | |
606 | if (size < 0) | |
607 | reason = "swap failure"; | |
608 | else if (!fetch->entry) | |
609 | reason = "swap aborted?!"; | |
610 | else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED)) | |
611 | reason = "swap aborted"; | |
612 | } | |
613 | ||
614 | /* continue checking (maybe-successful eof case) */ | |
615 | if (!reason && !size && fetch->state != DIGEST_READ_REPLY) { | |
616 | if (!pd->cd) | |
617 | reason = "null digest?!"; | |
618 | else if (fetch->mask_offset != pd->cd->mask_size) | |
619 | reason = "premature end of digest?!"; | |
620 | else if (!peerDigestUseful(pd)) | |
621 | reason = "useless digest"; | |
622 | else | |
623 | reason = no_bug = "success"; | |
624 | } | |
625 | ||
626 | /* finish if we have a reason */ | |
627 | if (reason) { | |
628 | const int level = strstr(reason, "?!") ? 1 : 3; | |
629 | debugs(72, level, "" << step_name << ": peer " << host << ", exiting after '" << reason << "'"); | |
630 | finishAndDeleteFetch(fetch, reason, !no_bug); | |
631 | } | |
632 | ||
633 | return reason != nullptr; | |
634 | } | |
635 | ||
636 | /* complete the digest transfer, update stats, unlock/release everything */ | |
637 | static void | |
638 | finishAndDeleteFetch(DigestFetchState * const fetch, const char * const reason, const bool err) | |
639 | { | |
640 | assert(reason); | |
641 | ||
642 | debugs(72, 2, "peer: " << RawPointer(fetch->pd.valid() ? fetch->pd->peer : nullptr).orNil() << ", reason: " << reason << ", err: " << err); | |
643 | ||
644 | /* note: order is significant */ | |
645 | peerDigestFetchSetStats(fetch); | |
646 | if (const auto pd = fetch->pd.get()) | |
647 | pd->noteFetchFinished(*fetch, reason, err); | |
648 | ||
649 | delete fetch; | |
650 | } | |
651 | ||
652 | void | |
653 | PeerDigest::noteFetchFinished(const DigestFetchState &finishedFetch, const char * const outcomeDescription, const bool sawError) | |
654 | { | |
655 | const auto pd = this; // TODO: remove this diff reducer | |
656 | const auto fetch = &finishedFetch; // TODO: remove this diff reducer | |
657 | ||
658 | pd->flags.requested = false; | |
659 | pd->req_result = outcomeDescription; | |
660 | ||
661 | pd->times.received = squid_curtime; | |
662 | pd->times.req_delay = fetch->resp_time; | |
663 | pd->stats.sent.kbytes += fetch->sent.bytes; | |
664 | pd->stats.recv.kbytes += fetch->recv.bytes; | |
665 | pd->stats.sent.msgs += fetch->sent.msg; | |
666 | pd->stats.recv.msgs += fetch->recv.msg; | |
667 | ||
668 | if (sawError) { | |
669 | debugs(72, DBG_IMPORTANT, "disabling (" << outcomeDescription << ") digest from " << host); | |
670 | ||
671 | pd->times.retry_delay = peerDigestIncDelay(pd); | |
672 | peerDigestSetCheck(pd, pd->times.retry_delay); | |
673 | delete pd->cd; | |
674 | pd->cd = nullptr; | |
675 | ||
676 | pd->flags.usable = false; | |
677 | } else { | |
678 | pd->flags.usable = true; | |
679 | pd->times.retry_delay = 0; | |
680 | peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry)); | |
681 | ||
682 | /* XXX: ugly condition, but how? */ | |
683 | ||
684 | if (fetch->entry->store_status == STORE_OK) | |
685 | debugs(72, 2, "re-used old digest from " << host); | |
686 | else | |
687 | debugs(72, 2, "received valid digest from " << host); | |
688 | } | |
689 | } | |
690 | ||
691 | /* calculate fetch stats after completion */ | |
692 | static void | |
693 | peerDigestFetchSetStats(DigestFetchState * fetch) | |
694 | { | |
695 | MemObject *mem; | |
696 | assert(fetch->entry && fetch->request); | |
697 | ||
698 | mem = fetch->entry->mem_obj; | |
699 | assert(mem); | |
700 | ||
701 | /* XXX: outgoing numbers are not precise */ | |
702 | /* XXX: we must distinguish between 304 hits and misses here */ | |
703 | fetch->sent.bytes = fetch->request->prefixLen(); | |
704 | /* XXX: this is slightly wrong: we don't KNOW that the entire memobject | |
705 | * was fetched. We only know how big it is | |
706 | */ | |
707 | fetch->recv.bytes = mem->size(); | |
708 | fetch->sent.msg = fetch->recv.msg = 1; | |
709 | fetch->expires = fetch->entry->expires; | |
710 | fetch->resp_time = squid_curtime - fetch->start_time; | |
711 | ||
712 | debugs(72, 3, "peerDigestFetchFinish: recv " << fetch->recv.bytes << | |
713 | " bytes in " << (int) fetch->resp_time << " secs"); | |
714 | ||
715 | debugs(72, 3, "peerDigestFetchFinish: expires: " << | |
716 | (long int) fetch->expires << " (" << std::showpos << | |
717 | (int) (fetch->expires - squid_curtime) << "), lmt: " << | |
718 | std::noshowpos << (long int) fetch->entry->lastModified() << " (" << | |
719 | std::showpos << (int) (fetch->entry->lastModified() - squid_curtime) << | |
720 | ")"); | |
721 | ||
722 | statCounter.cd.kbytes_sent += fetch->sent.bytes; | |
723 | statCounter.cd.kbytes_recv += fetch->recv.bytes; | |
724 | statCounter.cd.msgs_sent += fetch->sent.msg; | |
725 | statCounter.cd.msgs_recv += fetch->recv.msg; | |
726 | } | |
727 | ||
728 | static int | |
729 | peerDigestSetCBlock(PeerDigest * pd, const char *buf) | |
730 | { | |
731 | StoreDigestCBlock cblock; | |
732 | int freed_size = 0; | |
733 | const auto host = pd->host; | |
734 | ||
735 | memcpy(&cblock, buf, sizeof(cblock)); | |
736 | /* network -> host conversions */ | |
737 | cblock.ver.current = ntohs(cblock.ver.current); | |
738 | cblock.ver.required = ntohs(cblock.ver.required); | |
739 | cblock.capacity = ntohl(cblock.capacity); | |
740 | cblock.count = ntohl(cblock.count); | |
741 | cblock.del_count = ntohl(cblock.del_count); | |
742 | cblock.mask_size = ntohl(cblock.mask_size); | |
743 | debugs(72, 2, "got digest cblock from " << host << "; ver: " << | |
744 | (int) cblock.ver.current << " (req: " << (int) cblock.ver.required << | |
745 | ")"); | |
746 | ||
747 | debugs(72, 2, "\t size: " << | |
748 | cblock.mask_size << " bytes, e-cnt: " << | |
749 | cblock.count << ", e-util: " << | |
750 | xpercentInt(cblock.count, cblock.capacity) << "%" ); | |
751 | /* check version requirements (both ways) */ | |
752 | ||
753 | if (cblock.ver.required > CacheDigestVer.current) { | |
754 | debugs(72, DBG_IMPORTANT, "" << host << " digest requires version " << | |
755 | cblock.ver.required << "; have: " << CacheDigestVer.current); | |
756 | ||
757 | return 0; | |
758 | } | |
759 | ||
760 | if (cblock.ver.current < CacheDigestVer.required) { | |
761 | debugs(72, DBG_IMPORTANT, "" << host << " digest is version " << | |
762 | cblock.ver.current << "; we require: " << | |
763 | CacheDigestVer.required); | |
764 | ||
765 | return 0; | |
766 | } | |
767 | ||
768 | /* check consistency */ | |
769 | if (cblock.ver.required > cblock.ver.current || | |
770 | cblock.mask_size <= 0 || cblock.capacity <= 0 || | |
771 | cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) { | |
772 | debugs(72, DBG_CRITICAL, "" << host << " digest cblock is corrupted."); | |
773 | return 0; | |
774 | } | |
775 | ||
776 | /* check consistency further */ | |
777 | if ((size_t)cblock.mask_size != CacheDigest::CalcMaskSize(cblock.capacity, cblock.bits_per_entry)) { | |
778 | debugs(72, DBG_CRITICAL, host << " digest cblock is corrupted " << | |
779 | "(mask size mismatch: " << cblock.mask_size << " ? " << | |
780 | CacheDigest::CalcMaskSize(cblock.capacity, cblock.bits_per_entry) | |
781 | << ")."); | |
782 | return 0; | |
783 | } | |
784 | ||
785 | /* there are some things we cannot do yet */ | |
786 | if (cblock.hash_func_count != CacheDigestHashFuncCount) { | |
787 | debugs(72, DBG_CRITICAL, "ERROR: " << host << " digest: unsupported #hash functions: " << | |
788 | cblock.hash_func_count << " ? " << CacheDigestHashFuncCount << "."); | |
789 | return 0; | |
790 | } | |
791 | ||
792 | /* | |
793 | * no cblock bugs below this point | |
794 | */ | |
795 | /* check size changes */ | |
796 | if (pd->cd && cblock.mask_size != (ssize_t)pd->cd->mask_size) { | |
797 | debugs(72, 2, host << " digest changed size: " << cblock.mask_size << | |
798 | " -> " << pd->cd->mask_size); | |
799 | freed_size = pd->cd->mask_size; | |
800 | delete pd->cd; | |
801 | pd->cd = nullptr; | |
802 | } | |
803 | ||
804 | if (!pd->cd) { | |
805 | debugs(72, 2, "creating " << host << " digest; size: " << cblock.mask_size << " (" << | |
806 | std::showpos << (int) (cblock.mask_size - freed_size) << ") bytes"); | |
807 | pd->cd = new CacheDigest(cblock.capacity, cblock.bits_per_entry); | |
808 | ||
809 | if (cblock.mask_size >= freed_size) | |
810 | statCounter.cd.memory += (cblock.mask_size - freed_size); | |
811 | } | |
812 | ||
813 | assert(pd->cd); | |
814 | /* these assignments leave us in an inconsistent state until we finish reading the digest */ | |
815 | pd->cd->count = cblock.count; | |
816 | pd->cd->del_count = cblock.del_count; | |
817 | return 1; | |
818 | } | |
819 | ||
820 | static int | |
821 | peerDigestUseful(const PeerDigest * pd) | |
822 | { | |
823 | /* TODO: we should calculate the prob of a false hit instead of bit util */ | |
824 | const auto bit_util = pd->cd->usedMaskPercent(); | |
825 | ||
826 | if (bit_util > 65.0) { | |
827 | debugs(72, DBG_CRITICAL, "WARNING: " << pd->host << | |
828 | " peer digest has too many bits on (" << bit_util << "%)."); | |
829 | return 0; | |
830 | } | |
831 | ||
832 | return 1; | |
833 | } | |
834 | ||
835 | static int | |
836 | saneDiff(time_t diff) | |
837 | { | |
838 | return abs((int) diff) > squid_curtime / 2 ? 0 : diff; | |
839 | } | |
840 | ||
841 | void | |
842 | peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e) | |
843 | { | |
844 | #define f2s(flag) (pd->flags.flag ? "yes" : "no") | |
845 | #define appendTime(tm) storeAppendPrintf(e, "%s\t %10ld\t %+d\t %+d\n", \ | |
846 | ""#tm, (long int)pd->times.tm, \ | |
847 | saneDiff(pd->times.tm - squid_curtime), \ | |
848 | saneDiff(pd->times.tm - pd->times.initialized)) | |
849 | ||
850 | assert(pd); | |
851 | ||
852 | auto host = pd->host; | |
853 | storeAppendPrintf(e, "\npeer digest from " SQUIDSBUFPH "\n", SQUIDSBUFPRINT(host)); | |
854 | ||
855 | cacheDigestGuessStatsReport(&pd->stats.guess, e, host); | |
856 | ||
857 | storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n"); | |
858 | appendTime(initialized); | |
859 | appendTime(needed); | |
860 | appendTime(requested); | |
861 | appendTime(received); | |
862 | appendTime(next_check); | |
863 | ||
864 | storeAppendPrintf(e, "peer digest state:\n"); | |
865 | storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n", | |
866 | f2s(needed), f2s(usable), f2s(requested)); | |
867 | storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n", | |
868 | (int) pd->times.retry_delay); | |
869 | storeAppendPrintf(e, "\tlast request response time: %d secs\n", | |
870 | (int) pd->times.req_delay); | |
871 | storeAppendPrintf(e, "\tlast request result: %s\n", | |
872 | pd->req_result ? pd->req_result : "(none)"); | |
873 | ||
874 | storeAppendPrintf(e, "\npeer digest traffic:\n"); | |
875 | storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n", | |
876 | pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb); | |
877 | storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n", | |
878 | pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb); | |
879 | ||
880 | storeAppendPrintf(e, "\npeer digest structure:\n"); | |
881 | ||
882 | if (pd->cd) | |
883 | cacheDigestReport(pd->cd, host, e); | |
884 | else | |
885 | storeAppendPrintf(e, "\tno in-memory copy\n"); | |
886 | } | |
887 | ||
888 | #endif | |
889 |