]> git.ipfire.org Git - thirdparty/squid.git/blob - src/peer_digest.cc
f1c71283b7464b91adaf785a6d0b226143ff1d49
[thirdparty/squid.git] / src / peer_digest.cc
1 /*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 72 Peer Digest Routines */
10
11 #include "squid.h"
12 #if USE_CACHE_DIGESTS
13 #include "base/IoManip.h"
14 #include "CacheDigest.h"
15 #include "CachePeer.h"
16 #include "event.h"
17 #include "FwdState.h"
18 #include "globals.h"
19 #include "HttpReply.h"
20 #include "HttpRequest.h"
21 #include "internal.h"
22 #include "MemObject.h"
23 #include "mime_header.h"
24 #include "neighbors.h"
25 #include "PeerDigest.h"
26 #include "Store.h"
27 #include "store_key_md5.h"
28 #include "StoreClient.h"
29 #include "tools.h"
30 #include "util.h"
31
32 /* local types */
33
34 /* local prototypes */
35 static time_t peerDigestIncDelay(const PeerDigest * pd);
36 static time_t peerDigestNewDelay(const StoreEntry * e);
37 static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
38 static EVH peerDigestCheck;
39 static void peerDigestRequest(PeerDigest * pd);
40 static STCB peerDigestHandleReply;
41 static int peerDigestFetchReply(void *, char *, ssize_t);
42 int peerDigestSwapInCBlock(void *, char *, ssize_t);
43 int peerDigestSwapInMask(void *, char *, ssize_t);
44 static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
45 static void finishAndDeleteFetch(DigestFetchState *, const char *reason, bool sawError);
46 static void peerDigestFetchSetStats(DigestFetchState * fetch);
47 static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
48 static int peerDigestUseful(const PeerDigest * pd);
49
50 /* local constants */
51 Version const CacheDigestVer = { 5, 3 };
52
53 #define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
54
55 /* min interval for requesting digests from a given peer */
56 static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
57 /* min interval for requesting digests (cumulative request stream) */
58 static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
59
60 /* local vars */
61
62 static time_t pd_last_req_time = 0; /* last call to Check */
63
64 PeerDigest::PeerDigest(CachePeer * const p):
65 peer(p),
66 host(peer->host) // if peer disappears, we will know its name
67 {
68 times.initialized = squid_curtime;
69 }
70
71 CBDATA_CLASS_INIT(PeerDigest);
72
73 CBDATA_CLASS_INIT(DigestFetchState);
74
75 DigestFetchState::DigestFetchState(PeerDigest *aPd, HttpRequest *req) :
76 pd(aPd),
77 entry(nullptr),
78 old_entry(nullptr),
79 sc(nullptr),
80 old_sc(nullptr),
81 request(req),
82 offset(0),
83 mask_offset(0),
84 start_time(squid_curtime),
85 resp_time(0),
86 expires(0),
87 bufofs(0),
88 state(DIGEST_READ_REPLY)
89 {
90 HTTPMSGLOCK(request);
91
92 sent.msg = 0;
93 sent.bytes = 0;
94
95 recv.msg = 0;
96 recv.bytes = 0;
97
98 *buf = 0;
99 }
100
101 DigestFetchState::~DigestFetchState()
102 {
103 if (old_entry) {
104 debugs(72, 3, "deleting old entry");
105 storeUnregister(old_sc, old_entry, this);
106 old_entry->releaseRequest();
107 old_entry->unlock("DigestFetchState destructed old");
108 old_entry = nullptr;
109 }
110
111 /* unlock everything */
112 storeUnregister(sc, entry, this);
113
114 entry->unlock("DigestFetchState destructed");
115 entry = nullptr;
116
117 HTTPMSGUNLOCK(request);
118 }
119
120 PeerDigest::~PeerDigest()
121 {
122 if (times.next_check && eventFind(peerDigestCheck, this))
123 eventDelete(peerDigestCheck, this);
124 delete cd;
125 // req_result pointer is not owned by us
126 }
127
128 /* called by peer to indicate that somebody actually needs this digest */
129 void
130 peerDigestNeeded(PeerDigest * pd)
131 {
132 assert(pd);
133 assert(!pd->flags.needed);
134 assert(!pd->cd);
135
136 pd->flags.needed = true;
137 pd->times.needed = squid_curtime;
138 peerDigestSetCheck(pd, 0); /* check asap */
139 }
140
141 /* increment retry delay [after an unsuccessful attempt] */
142 static time_t
143 peerDigestIncDelay(const PeerDigest * pd)
144 {
145 assert(pd);
146 return pd->times.retry_delay > 0 ?
147 2 * pd->times.retry_delay : /* exponential backoff */
148 PeerDigestReqMinGap; /* minimal delay */
149 }
150
151 /* artificially increases Expires: setting to avoid race conditions
152 * returns the delay till that [increased] expiration time */
153 static time_t
154 peerDigestNewDelay(const StoreEntry * e)
155 {
156 assert(e);
157
158 if (e->expires > 0)
159 return e->expires + PeerDigestReqMinGap - squid_curtime;
160
161 return PeerDigestReqMinGap;
162 }
163
164 /* registers next digest verification */
165 static void
166 peerDigestSetCheck(PeerDigest * pd, time_t delay)
167 {
168 eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
169 pd->times.next_check = squid_curtime + delay;
170 debugs(72, 3, "peerDigestSetCheck: will check peer " << pd->host << " in " << delay << " secs");
171 }
172
173 /* callback for eventAdd() (with peer digest locked)
174 * request new digest if our copy is too old or if we lack one;
175 * schedule next check otherwise */
176 static void
177 peerDigestCheck(void *data)
178 {
179 PeerDigest *pd = (PeerDigest *)data;
180 time_t req_time;
181
182 assert(!pd->flags.requested);
183
184 pd->times.next_check = 0; /* unknown */
185
186 Assure(pd->peer.valid());
187
188 debugs(72, 3, "cache_peer " << RawPointer(pd->peer).orNil());
189 debugs(72, 3, "peerDigestCheck: time: " << squid_curtime <<
190 ", last received: " << (long int) pd->times.received << " (" <<
191 std::showpos << (int) (squid_curtime - pd->times.received) << ")");
192
193 /* decide when we should send the request:
194 * request now unless too close to other requests */
195 req_time = squid_curtime;
196
197 /* per-peer limit */
198
199 if (req_time - pd->times.received < PeerDigestReqMinGap) {
200 debugs(72, 2, "peerDigestCheck: " << pd->host <<
201 ", avoiding close peer requests (" <<
202 (int) (req_time - pd->times.received) << " < " <<
203 (int) PeerDigestReqMinGap << " secs).");
204
205 req_time = pd->times.received + PeerDigestReqMinGap;
206 }
207
208 /* global limit */
209 if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
210 debugs(72, 2, "peerDigestCheck: " << pd->host <<
211 ", avoiding close requests (" <<
212 (int) (req_time - pd_last_req_time) << " < " <<
213 (int) GlobDigestReqMinGap << " secs).");
214
215 req_time = pd_last_req_time + GlobDigestReqMinGap;
216 }
217
218 if (req_time <= squid_curtime)
219 peerDigestRequest(pd); /* will set pd->flags.requested */
220 else
221 peerDigestSetCheck(pd, req_time - squid_curtime);
222 }
223
224 /* ask store for a digest */
225 static void
226 peerDigestRequest(PeerDigest * pd)
227 {
228 const auto p = pd->peer.get(); // TODO: Replace with a reference.
229 StoreEntry *e, *old_e;
230 char *url = nullptr;
231 HttpRequest *req;
232 StoreIOBuffer tempBuffer;
233
234 pd->req_result = nullptr;
235 pd->flags.requested = true;
236
237 /* compute future request components */
238
239 if (p->digest_url)
240 url = xstrdup(p->digest_url);
241 else
242 url = xstrdup(internalRemoteUri(p->secure.encryptTransport, p->host, p->http_port, "/squid-internal-periodic/", SBuf(StoreDigestFileName)));
243 debugs(72, 2, url);
244
245 const auto mx = MasterXaction::MakePortless<XactionInitiator::initCacheDigest>();
246 req = HttpRequest::FromUrlXXX(url, mx);
247
248 assert(req);
249
250 /* add custom headers */
251 assert(!req->header.len);
252
253 req->header.putStr(Http::HdrType::ACCEPT, StoreDigestMimeStr);
254
255 req->header.putStr(Http::HdrType::ACCEPT, "text/html");
256
257 if (p->login &&
258 p->login[0] != '*' &&
259 strcmp(p->login, "PASS") != 0 &&
260 strcmp(p->login, "PASSTHRU") != 0 &&
261 strncmp(p->login, "NEGOTIATE",9) != 0 &&
262 strcmp(p->login, "PROXYPASS") != 0) {
263 req->url.userInfo(SBuf(p->login)); // XXX: performance regression make peer login SBuf as well.
264 }
265 /* create fetch state structure */
266 DigestFetchState *fetch = new DigestFetchState(pd, req);
267
268 /* update timestamps */
269 pd->times.requested = squid_curtime;
270 pd_last_req_time = squid_curtime;
271 req->flags.cachable.support(); // prevent RELEASE_REQUEST in storeCreateEntry()
272
273 /* the rest is based on clientReplyContext::processExpired() */
274 req->flags.refresh = true;
275
276 old_e = fetch->old_entry = storeGetPublicByRequest(req);
277
278 // XXX: Missing a hittingRequiresCollapsing() && startCollapsingOn() check.
279 if (old_e) {
280 debugs(72, 5, "found old " << *old_e);
281
282 old_e->lock("peerDigestRequest");
283 old_e->ensureMemObject(url, url, req->method);
284
285 fetch->old_sc = storeClientListAdd(old_e, fetch);
286 }
287
288 e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
289 debugs(72, 5, "created " << *e);
290 assert(EBIT_TEST(e->flags, KEY_PRIVATE));
291 fetch->sc = storeClientListAdd(e, fetch);
292 /* set lastmod to trigger IMS request if possible */
293
294 // TODO: Also check for fetch->pd->cd presence as a precondition for sending
295 // IMS requests because peerDigestFetchReply() does not accept 304 responses
296 // without an in-memory cache digest.
297 if (old_e)
298 e->lastModified(old_e->lastModified());
299
300 /* push towards peer cache */
301 FwdState::fwdStart(Comm::ConnectionPointer(), e, req);
302
303 tempBuffer.offset = 0;
304
305 tempBuffer.length = SM_PAGE_SIZE;
306
307 tempBuffer.data = fetch->buf;
308
309 storeClientCopy(fetch->sc, e, tempBuffer,
310 peerDigestHandleReply, fetch);
311
312 safe_free(url);
313 }
314
315 /* Handle the data copying .. */
316
317 /*
318 * This routine handles the copy data and then redirects the
319 * copy to a bunch of subfunctions depending upon the copy state.
320 * It also tracks the buffer offset and "seen", since I'm actually
321 * not interested in rewriting everything to suit my little idea.
322 */
323 static void
324 peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
325 {
326 DigestFetchState *fetch = (DigestFetchState *)data;
327 int retsize = -1;
328 digest_read_state_t prevstate;
329 int newsize;
330
331 if (receivedData.flags.error) {
332 finishAndDeleteFetch(fetch, "failure loading digest reply from Store", true);
333 return;
334 }
335
336 if (!fetch->pd) {
337 finishAndDeleteFetch(fetch, "digest disappeared while loading digest reply from Store", true);
338 return;
339 }
340
341 /* The existing code assumes that the received pointer is
342 * where we asked the data to be put
343 */
344 assert(!receivedData.data || fetch->buf + fetch->bufofs == receivedData.data);
345
346 /* Update the buffer size */
347 fetch->bufofs += receivedData.length;
348
349 assert(fetch->bufofs <= SM_PAGE_SIZE);
350
351 /* If we've fetched enough, return */
352
353 if (peerDigestFetchedEnough(fetch, fetch->buf, fetch->bufofs, "peerDigestHandleReply"))
354 return;
355
356 /* Call the right function based on the state */
357 /* (Those functions will update the state if needed) */
358
359 /* Give us a temporary reference. Some of the calls we make may
360 * try to destroy the fetch structure, and we like to know if they
361 * do
362 */
363 CbcPointer<DigestFetchState> tmpLock = fetch;
364
365 /* Repeat this loop until we're out of data OR the state changes */
366 /* (So keep going if the state has changed and we still have data */
367 do {
368 prevstate = fetch->state;
369
370 switch (fetch->state) {
371
372 case DIGEST_READ_REPLY:
373 retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs);
374 break;
375
376 case DIGEST_READ_CBLOCK:
377 retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs);
378 break;
379
380 case DIGEST_READ_MASK:
381 retsize = peerDigestSwapInMask(fetch, fetch->buf, fetch->bufofs);
382 break;
383
384 case DIGEST_READ_NONE:
385 break;
386
387 default:
388 fatal("Bad digest transfer mode!\n");
389 }
390
391 if (retsize < 0)
392 return;
393
394 /*
395 * The returned size indicates how much of the buffer was read -
396 * so move the remainder of the buffer to the beginning
397 * and update the bufofs / bufsize
398 */
399 newsize = fetch->bufofs - retsize;
400
401 memmove(fetch->buf, fetch->buf + retsize, fetch->bufofs - newsize);
402
403 fetch->bufofs = newsize;
404
405 } while (cbdataReferenceValid(fetch) && prevstate != fetch->state && fetch->bufofs > 0);
406
407 // Check for EOF here, thus giving the parser one extra run. We could avoid this overhead by
408 // checking at the beginning of this function. However, in this case, we would have to require
409 // that the parser does not regard EOF as a special condition (it is true now but may change
410 // in the future).
411 if (fetch->sc->atEof()) {
412 finishAndDeleteFetch(fetch, "premature end of digest reply", true);
413 return;
414 }
415
416 /* Update the copy offset */
417 fetch->offset += receivedData.length;
418
419 /* Schedule another copy */
420 if (cbdataReferenceValid(fetch)) {
421 StoreIOBuffer tempBuffer;
422 tempBuffer.offset = fetch->offset;
423 tempBuffer.length = SM_PAGE_SIZE - fetch->bufofs;
424 tempBuffer.data = fetch->buf + fetch->bufofs;
425 storeClientCopy(fetch->sc, fetch->entry, tempBuffer,
426 peerDigestHandleReply, fetch);
427 }
428 }
429
430 /// handle HTTP response headers in the initial storeClientCopy() response
431 static int
432 peerDigestFetchReply(void *data, char *buf, ssize_t size)
433 {
434 DigestFetchState *fetch = (DigestFetchState *)data;
435 const auto pd = fetch->pd.get();
436 assert(pd && buf);
437 assert(!fetch->offset);
438
439 assert(fetch->state == DIGEST_READ_REPLY);
440
441 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
442 return -1;
443
444 {
445 const auto &reply = fetch->entry->mem().freshestReply();
446 const auto status = reply.sline.status();
447 assert(status != Http::scNone);
448 debugs(72, 3, "peerDigestFetchReply: " << pd->host << " status: " << status <<
449 ", expires: " << (long int) reply.expires << " (" << std::showpos <<
450 (int) (reply.expires - squid_curtime) << ")");
451
452 /* this "if" is based on clientHandleIMSReply() */
453
454 if (status == Http::scNotModified) {
455 /* our old entry is fine */
456 assert(fetch->old_entry);
457
458 if (!fetch->old_entry->mem_obj->request)
459 fetch->old_entry->mem_obj->request = fetch->entry->mem_obj->request;
460
461 assert(fetch->old_entry->mem_obj->request);
462
463 if (!Store::Root().updateOnNotModified(fetch->old_entry, *fetch->entry)) {
464 finishAndDeleteFetch(fetch, "header update failure after a 304 response", true);
465 return -1;
466 }
467
468 /* get rid of 304 reply */
469 storeUnregister(fetch->sc, fetch->entry, fetch);
470
471 fetch->entry->unlock("peerDigestFetchReply 304");
472
473 fetch->entry = fetch->old_entry;
474
475 fetch->old_entry = nullptr;
476
477 /* preserve request -- we need its size to update counters */
478 /* requestUnlink(r); */
479 /* fetch->entry->mem_obj->request = nullptr; */
480
481 if (!fetch->pd->cd) {
482 finishAndDeleteFetch(fetch, "304 without the old in-memory digest", true);
483 return -1;
484 }
485
486 // stay with the old in-memory digest
487 finishAndDeleteFetch(fetch, "Not modified", false);
488 return -1;
489 } else if (status == Http::scOkay) {
490 /* get rid of old entry if any */
491
492 if (fetch->old_entry) {
493 debugs(72, 3, "peerDigestFetchReply: got new digest, releasing old one");
494 storeUnregister(fetch->old_sc, fetch->old_entry, fetch);
495 fetch->old_entry->releaseRequest();
496 fetch->old_entry->unlock("peerDigestFetchReply 200");
497 fetch->old_entry = nullptr;
498 }
499
500 fetch->state = DIGEST_READ_CBLOCK;
501 } else {
502 /* some kind of a bug */
503 finishAndDeleteFetch(fetch, reply.sline.reason(), true);
504 return -1; /* XXX -1 will abort stuff in ReadReply! */
505 }
506 }
507
508 return 0; // we consumed/used no buffered bytes
509 }
510
511 int
512 peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
513 {
514 DigestFetchState *fetch = (DigestFetchState *)data;
515
516 assert(fetch->state == DIGEST_READ_CBLOCK);
517
518 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
519 return -1;
520
521 if (size >= (ssize_t)StoreDigestCBlockSize) {
522 const auto pd = fetch->pd.get();
523
524 assert(pd);
525 assert(fetch->entry->mem_obj);
526
527 if (peerDigestSetCBlock(pd, buf)) {
528 /* XXX: soon we will have variable header size */
529 /* switch to CD buffer and fetch digest guts */
530 buf = nullptr;
531 assert(pd->cd->mask);
532 fetch->state = DIGEST_READ_MASK;
533 return StoreDigestCBlockSize;
534 } else {
535 finishAndDeleteFetch(fetch, "invalid digest cblock", true);
536 return -1;
537 }
538 }
539
540 /* need more data, do we have space? */
541 if (size >= SM_PAGE_SIZE) {
542 finishAndDeleteFetch(fetch, "digest cblock too big", true);
543 return -1;
544 }
545
546 return 0; /* We need more data */
547 }
548
549 int
550 peerDigestSwapInMask(void *data, char *buf, ssize_t size)
551 {
552 DigestFetchState *fetch = (DigestFetchState *)data;
553 const auto pd = fetch->pd.get();
554 assert(pd);
555 assert(pd->cd && pd->cd->mask);
556
557 /*
558 * NOTENOTENOTENOTENOTE: buf doesn't point to pd->cd->mask anymore!
559 * we need to do the copy ourselves!
560 */
561 memcpy(pd->cd->mask + fetch->mask_offset, buf, size);
562
563 /* NOTE! buf points to the middle of pd->cd->mask! */
564
565 if (peerDigestFetchedEnough(fetch, nullptr, size, "peerDigestSwapInMask"))
566 return -1;
567
568 fetch->mask_offset += size;
569
570 if (fetch->mask_offset >= pd->cd->mask_size) {
571 debugs(72, 2, "peerDigestSwapInMask: Done! Got " <<
572 fetch->mask_offset << ", expected " << pd->cd->mask_size);
573 assert(fetch->mask_offset == pd->cd->mask_size);
574 assert(peerDigestFetchedEnough(fetch, nullptr, 0, "peerDigestSwapInMask"));
575 return -1; /* XXX! */
576 }
577
578 /* We always read everything, so return size */
579 return size;
580 }
581
582 static int
583 peerDigestFetchedEnough(DigestFetchState * fetch, char *, ssize_t size, const char *step_name)
584 {
585 static const SBuf hostUnknown("<unknown>"); // peer host (if any)
586 SBuf host = hostUnknown;
587
588 const auto pd = fetch->pd.get();
589 Assure(pd);
590 const char *reason = nullptr; /* reason for completion */
591 const char *no_bug = nullptr; /* successful completion if set */
592
593 /* test possible exiting conditions (the same for most steps!)
594 * cases marked with '?!' should not happen */
595
596 if (!reason) {
597 host = pd->host;
598 }
599
600 debugs(72, 6, step_name << ": peer " << host << ", offset: " <<
601 fetch->offset << " size: " << size << ".");
602
603 /* continue checking (with pd and host known and valid) */
604
605 if (!reason) {
606 if (size < 0)
607 reason = "swap failure";
608 else if (!fetch->entry)
609 reason = "swap aborted?!";
610 else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))
611 reason = "swap aborted";
612 }
613
614 /* continue checking (maybe-successful eof case) */
615 if (!reason && !size && fetch->state != DIGEST_READ_REPLY) {
616 if (!pd->cd)
617 reason = "null digest?!";
618 else if (fetch->mask_offset != pd->cd->mask_size)
619 reason = "premature end of digest?!";
620 else if (!peerDigestUseful(pd))
621 reason = "useless digest";
622 else
623 reason = no_bug = "success";
624 }
625
626 /* finish if we have a reason */
627 if (reason) {
628 const int level = strstr(reason, "?!") ? 1 : 3;
629 debugs(72, level, "" << step_name << ": peer " << host << ", exiting after '" << reason << "'");
630 finishAndDeleteFetch(fetch, reason, !no_bug);
631 }
632
633 return reason != nullptr;
634 }
635
636 /* complete the digest transfer, update stats, unlock/release everything */
637 static void
638 finishAndDeleteFetch(DigestFetchState * const fetch, const char * const reason, const bool err)
639 {
640 assert(reason);
641
642 debugs(72, 2, "peer: " << RawPointer(fetch->pd.valid() ? fetch->pd->peer : nullptr).orNil() << ", reason: " << reason << ", err: " << err);
643
644 /* note: order is significant */
645 peerDigestFetchSetStats(fetch);
646 if (const auto pd = fetch->pd.get())
647 pd->noteFetchFinished(*fetch, reason, err);
648
649 delete fetch;
650 }
651
652 void
653 PeerDigest::noteFetchFinished(const DigestFetchState &finishedFetch, const char * const outcomeDescription, const bool sawError)
654 {
655 const auto pd = this; // TODO: remove this diff reducer
656 const auto fetch = &finishedFetch; // TODO: remove this diff reducer
657
658 pd->flags.requested = false;
659 pd->req_result = outcomeDescription;
660
661 pd->times.received = squid_curtime;
662 pd->times.req_delay = fetch->resp_time;
663 pd->stats.sent.kbytes += fetch->sent.bytes;
664 pd->stats.recv.kbytes += fetch->recv.bytes;
665 pd->stats.sent.msgs += fetch->sent.msg;
666 pd->stats.recv.msgs += fetch->recv.msg;
667
668 if (sawError) {
669 debugs(72, DBG_IMPORTANT, "disabling (" << outcomeDescription << ") digest from " << host);
670
671 pd->times.retry_delay = peerDigestIncDelay(pd);
672 peerDigestSetCheck(pd, pd->times.retry_delay);
673 delete pd->cd;
674 pd->cd = nullptr;
675
676 pd->flags.usable = false;
677 } else {
678 pd->flags.usable = true;
679 pd->times.retry_delay = 0;
680 peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
681
682 /* XXX: ugly condition, but how? */
683
684 if (fetch->entry->store_status == STORE_OK)
685 debugs(72, 2, "re-used old digest from " << host);
686 else
687 debugs(72, 2, "received valid digest from " << host);
688 }
689 }
690
691 /* calculate fetch stats after completion */
692 static void
693 peerDigestFetchSetStats(DigestFetchState * fetch)
694 {
695 MemObject *mem;
696 assert(fetch->entry && fetch->request);
697
698 mem = fetch->entry->mem_obj;
699 assert(mem);
700
701 /* XXX: outgoing numbers are not precise */
702 /* XXX: we must distinguish between 304 hits and misses here */
703 fetch->sent.bytes = fetch->request->prefixLen();
704 /* XXX: this is slightly wrong: we don't KNOW that the entire memobject
705 * was fetched. We only know how big it is
706 */
707 fetch->recv.bytes = mem->size();
708 fetch->sent.msg = fetch->recv.msg = 1;
709 fetch->expires = fetch->entry->expires;
710 fetch->resp_time = squid_curtime - fetch->start_time;
711
712 debugs(72, 3, "peerDigestFetchFinish: recv " << fetch->recv.bytes <<
713 " bytes in " << (int) fetch->resp_time << " secs");
714
715 debugs(72, 3, "peerDigestFetchFinish: expires: " <<
716 (long int) fetch->expires << " (" << std::showpos <<
717 (int) (fetch->expires - squid_curtime) << "), lmt: " <<
718 std::noshowpos << (long int) fetch->entry->lastModified() << " (" <<
719 std::showpos << (int) (fetch->entry->lastModified() - squid_curtime) <<
720 ")");
721
722 statCounter.cd.kbytes_sent += fetch->sent.bytes;
723 statCounter.cd.kbytes_recv += fetch->recv.bytes;
724 statCounter.cd.msgs_sent += fetch->sent.msg;
725 statCounter.cd.msgs_recv += fetch->recv.msg;
726 }
727
728 static int
729 peerDigestSetCBlock(PeerDigest * pd, const char *buf)
730 {
731 StoreDigestCBlock cblock;
732 int freed_size = 0;
733 const auto host = pd->host;
734
735 memcpy(&cblock, buf, sizeof(cblock));
736 /* network -> host conversions */
737 cblock.ver.current = ntohs(cblock.ver.current);
738 cblock.ver.required = ntohs(cblock.ver.required);
739 cblock.capacity = ntohl(cblock.capacity);
740 cblock.count = ntohl(cblock.count);
741 cblock.del_count = ntohl(cblock.del_count);
742 cblock.mask_size = ntohl(cblock.mask_size);
743 debugs(72, 2, "got digest cblock from " << host << "; ver: " <<
744 (int) cblock.ver.current << " (req: " << (int) cblock.ver.required <<
745 ")");
746
747 debugs(72, 2, "\t size: " <<
748 cblock.mask_size << " bytes, e-cnt: " <<
749 cblock.count << ", e-util: " <<
750 xpercentInt(cblock.count, cblock.capacity) << "%" );
751 /* check version requirements (both ways) */
752
753 if (cblock.ver.required > CacheDigestVer.current) {
754 debugs(72, DBG_IMPORTANT, "" << host << " digest requires version " <<
755 cblock.ver.required << "; have: " << CacheDigestVer.current);
756
757 return 0;
758 }
759
760 if (cblock.ver.current < CacheDigestVer.required) {
761 debugs(72, DBG_IMPORTANT, "" << host << " digest is version " <<
762 cblock.ver.current << "; we require: " <<
763 CacheDigestVer.required);
764
765 return 0;
766 }
767
768 /* check consistency */
769 if (cblock.ver.required > cblock.ver.current ||
770 cblock.mask_size <= 0 || cblock.capacity <= 0 ||
771 cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
772 debugs(72, DBG_CRITICAL, "" << host << " digest cblock is corrupted.");
773 return 0;
774 }
775
776 /* check consistency further */
777 if ((size_t)cblock.mask_size != CacheDigest::CalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
778 debugs(72, DBG_CRITICAL, host << " digest cblock is corrupted " <<
779 "(mask size mismatch: " << cblock.mask_size << " ? " <<
780 CacheDigest::CalcMaskSize(cblock.capacity, cblock.bits_per_entry)
781 << ").");
782 return 0;
783 }
784
785 /* there are some things we cannot do yet */
786 if (cblock.hash_func_count != CacheDigestHashFuncCount) {
787 debugs(72, DBG_CRITICAL, "ERROR: " << host << " digest: unsupported #hash functions: " <<
788 cblock.hash_func_count << " ? " << CacheDigestHashFuncCount << ".");
789 return 0;
790 }
791
792 /*
793 * no cblock bugs below this point
794 */
795 /* check size changes */
796 if (pd->cd && cblock.mask_size != (ssize_t)pd->cd->mask_size) {
797 debugs(72, 2, host << " digest changed size: " << cblock.mask_size <<
798 " -> " << pd->cd->mask_size);
799 freed_size = pd->cd->mask_size;
800 delete pd->cd;
801 pd->cd = nullptr;
802 }
803
804 if (!pd->cd) {
805 debugs(72, 2, "creating " << host << " digest; size: " << cblock.mask_size << " (" <<
806 std::showpos << (int) (cblock.mask_size - freed_size) << ") bytes");
807 pd->cd = new CacheDigest(cblock.capacity, cblock.bits_per_entry);
808
809 if (cblock.mask_size >= freed_size)
810 statCounter.cd.memory += (cblock.mask_size - freed_size);
811 }
812
813 assert(pd->cd);
814 /* these assignments leave us in an inconsistent state until we finish reading the digest */
815 pd->cd->count = cblock.count;
816 pd->cd->del_count = cblock.del_count;
817 return 1;
818 }
819
820 static int
821 peerDigestUseful(const PeerDigest * pd)
822 {
823 /* TODO: we should calculate the prob of a false hit instead of bit util */
824 const auto bit_util = pd->cd->usedMaskPercent();
825
826 if (bit_util > 65.0) {
827 debugs(72, DBG_CRITICAL, "WARNING: " << pd->host <<
828 " peer digest has too many bits on (" << bit_util << "%).");
829 return 0;
830 }
831
832 return 1;
833 }
834
835 static int
836 saneDiff(time_t diff)
837 {
838 return abs((int) diff) > squid_curtime / 2 ? 0 : diff;
839 }
840
841 void
842 peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e)
843 {
844 #define f2s(flag) (pd->flags.flag ? "yes" : "no")
845 #define appendTime(tm) storeAppendPrintf(e, "%s\t %10ld\t %+d\t %+d\n", \
846 ""#tm, (long int)pd->times.tm, \
847 saneDiff(pd->times.tm - squid_curtime), \
848 saneDiff(pd->times.tm - pd->times.initialized))
849
850 assert(pd);
851
852 auto host = pd->host;
853 storeAppendPrintf(e, "\npeer digest from " SQUIDSBUFPH "\n", SQUIDSBUFPRINT(host));
854
855 cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
856
857 storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");
858 appendTime(initialized);
859 appendTime(needed);
860 appendTime(requested);
861 appendTime(received);
862 appendTime(next_check);
863
864 storeAppendPrintf(e, "peer digest state:\n");
865 storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",
866 f2s(needed), f2s(usable), f2s(requested));
867 storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",
868 (int) pd->times.retry_delay);
869 storeAppendPrintf(e, "\tlast request response time: %d secs\n",
870 (int) pd->times.req_delay);
871 storeAppendPrintf(e, "\tlast request result: %s\n",
872 pd->req_result ? pd->req_result : "(none)");
873
874 storeAppendPrintf(e, "\npeer digest traffic:\n");
875 storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",
876 pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
877 storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n",
878 pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
879
880 storeAppendPrintf(e, "\npeer digest structure:\n");
881
882 if (pd->cd)
883 cacheDigestReport(pd->cd, host, e);
884 else
885 storeAppendPrintf(e, "\tno in-memory copy\n");
886 }
887
888 #endif
889