]> git.ipfire.org Git - thirdparty/squid.git/blob - src/peer_digest.cc
Updated copyright
[thirdparty/squid.git] / src / peer_digest.cc
1
2 /*
3 * $Id: peer_digest.cc,v 1.77 2001/01/12 00:37:20 wessels Exp $
4 *
5 * DEBUG: section 72 Peer Digest Routines
6 * AUTHOR: Alex Rousskov
7 *
8 * SQUID Web Proxy Cache http://www.squid-cache.org/
9 * ----------------------------------------------------------
10 *
11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
36 #include "squid.h"
37
38 #if USE_CACHE_DIGESTS
39
40 /* local types */
41
42 /* local prototypes */
43 static time_t peerDigestIncDelay(const PeerDigest * pd);
44 static time_t peerDigestNewDelay(const StoreEntry * e);
45 static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
46 static void peerDigestClean(PeerDigest *);
47 static EVH peerDigestCheck;
48 static void peerDigestRequest(PeerDigest * pd);
49 static STCB peerDigestFetchReply;
50 static STCB peerDigestSwapInHeaders;
51 static STCB peerDigestSwapInCBlock;
52 static STCB peerDigestSwapInMask;
53 static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
54 static void peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason);
55 static void peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason);
56 static void peerDigestReqFinish(DigestFetchState * fetch, char *buf, int, int, int, const char *reason, int err);
57 static void peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err);
58 static void peerDigestFetchFinish(DigestFetchState * fetch, int err);
59 static void peerDigestFetchSetStats(DigestFetchState * fetch);
60 static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
61 static int peerDigestUseful(const PeerDigest * pd);
62
63
64 /* local constants */
65
66 #define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
67
68 /* min interval for requesting digests from a given peer */
69 static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
70 /* min interval for requesting digests (cumulative request stream) */
71 static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
72
73 /* local vars */
74
75 static time_t pd_last_req_time = 0; /* last call to Check */
76
77 /* initialize peer digest */
78 static void
79 peerDigestInit(PeerDigest * pd, peer * p)
80 {
81 assert(pd && p);
82
83 memset(pd, 0, sizeof(*pd));
84 pd->peer = p;
85 /* if peer disappears, we will know it's name */
86 stringInit(&pd->host, p->host);
87
88 pd->times.initialized = squid_curtime;
89 }
90
91 static void
92 peerDigestClean(PeerDigest * pd)
93 {
94 assert(pd);
95 if (pd->cd)
96 cacheDigestDestroy(pd->cd);
97 stringClean(&pd->host);
98 }
99
100 CBDATA_TYPE(PeerDigest);
101
102 /* allocate new peer digest, call Init, and lock everything */
103 PeerDigest *
104 peerDigestCreate(peer * p)
105 {
106 PeerDigest *pd;
107 assert(p);
108
109 CBDATA_INIT_TYPE(PeerDigest);
110 pd = CBDATA_ALLOC(PeerDigest, NULL);
111 peerDigestInit(pd, p);
112 cbdataLock(pd->peer); /* we will use the peer */
113
114 return pd;
115 }
116
117 /* call Clean and free/unlock everything */
118 void
119 peerDigestDestroy(PeerDigest * pd)
120 {
121 peer *p;
122 assert(pd);
123
124 p = pd->peer;
125 pd->peer = NULL;
126 /* inform peer (if any) that we are gone */
127 if (cbdataValid(p))
128 peerNoteDigestGone(p);
129 cbdataUnlock(p); /* must unlock, valid or not */
130
131 peerDigestClean(pd);
132 cbdataFree(pd);
133 }
134
135 /* called by peer to indicate that somebody actually needs this digest */
136 void
137 peerDigestNeeded(PeerDigest * pd)
138 {
139 assert(pd);
140 assert(!pd->flags.needed);
141 assert(!pd->cd);
142
143 pd->flags.needed = 1;
144 pd->times.needed = squid_curtime;
145 peerDigestSetCheck(pd, 0); /* check asap */
146 }
147
148 /* currently we do not have a reason to disable without destroying */
149 #if FUTURE_CODE
150 /* disables peer for good */
151 static void
152 peerDigestDisable(PeerDigest * pd)
153 {
154 debug(72, 2) ("peerDigestDisable: peer %s disabled for good\n",
155 strBuf(pd->host));
156 pd->times.disabled = squid_curtime;
157 pd->times.next_check = -1; /* never */
158 pd->flags.usable = 0;
159
160 if (pd->cd) {
161 cacheDigestDestroy(pd->cd);
162 pd->cd = NULL;
163 }
164 /* we do not destroy the pd itself to preserve its "history" and stats */
165 }
166 #endif
167
168 /* increment retry delay [after an unsuccessful attempt] */
169 static time_t
170 peerDigestIncDelay(const PeerDigest * pd)
171 {
172 assert(pd);
173 return pd->times.retry_delay > 0 ?
174 2 * pd->times.retry_delay : /* exponential backoff */
175 PeerDigestReqMinGap; /* minimal delay */
176 }
177
178 /* artificially increases Expires: setting to avoid race conditions
179 * returns the delay till that [increased] expiration time */
180 static time_t
181 peerDigestNewDelay(const StoreEntry * e)
182 {
183 assert(e);
184 if (e->expires > 0)
185 return e->expires + PeerDigestReqMinGap - squid_curtime;
186 return PeerDigestReqMinGap;
187 }
188
189 /* registers next digest verification */
190 static void
191 peerDigestSetCheck(PeerDigest * pd, time_t delay)
192 {
193 eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
194 pd->times.next_check = squid_curtime + delay;
195 debug(72, 3) ("peerDigestSetCheck: will check peer %s in %d secs\n",
196 strBuf(pd->host), delay);
197 }
198
199 /*
200 * called when peer is about to disappear or have already disappeared
201 */
202 void
203 peerDigestNotePeerGone(PeerDigest * pd)
204 {
205 if (pd->flags.requested) {
206 debug(72, 2) ("peerDigest: peer %s gone, will destroy after fetch.\n",
207 strBuf(pd->host));
208 /* do nothing now, the fetching chain will notice and take action */
209 } else {
210 debug(72, 2) ("peerDigest: peer %s is gone, destroying now.\n",
211 strBuf(pd->host));
212 peerDigestDestroy(pd);
213 }
214 }
215
216 /* callback for eventAdd() (with peer digest locked)
217 * request new digest if our copy is too old or if we lack one;
218 * schedule next check otherwise */
219 static void
220 peerDigestCheck(void *data)
221 {
222 PeerDigest *pd = data;
223 time_t req_time;
224
225 /*
226 * you can't assert(cbdataValid(pd)) -- if its not valid this
227 * function never gets called
228 */
229 assert(!pd->flags.requested);
230
231 pd->times.next_check = 0; /* unknown */
232
233 if (!cbdataValid(pd->peer)) {
234 peerDigestNotePeerGone(pd);
235 return;
236 }
237 debug(72, 3) ("peerDigestCheck: peer %s:%d\n", pd->peer->host, pd->peer->http_port);
238 debug(72, 3) ("peerDigestCheck: time: %d, last received: %d (%+d)\n",
239 squid_curtime, pd->times.received, (squid_curtime - pd->times.received));
240
241 /* decide when we should send the request:
242 * request now unless too close to other requests */
243 req_time = squid_curtime;
244
245 /* per-peer limit */
246 if (req_time - pd->times.received < PeerDigestReqMinGap) {
247 debug(72, 2) ("peerDigestCheck: %s, avoiding close peer requests (%d < %d secs).\n",
248 strBuf(pd->host), req_time - pd->times.received,
249 PeerDigestReqMinGap);
250 req_time = pd->times.received + PeerDigestReqMinGap;
251 }
252 /* global limit */
253 if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
254 debug(72, 2) ("peerDigestCheck: %s, avoiding close requests (%d < %d secs).\n",
255 strBuf(pd->host), req_time - pd_last_req_time,
256 GlobDigestReqMinGap);
257 req_time = pd_last_req_time + GlobDigestReqMinGap;
258 }
259 if (req_time <= squid_curtime)
260 peerDigestRequest(pd); /* will set pd->flags.requested */
261 else
262 peerDigestSetCheck(pd, req_time - squid_curtime);
263 }
264
265 /* ask store for a digest */
266 static void
267 peerDigestRequest(PeerDigest * pd)
268 {
269 peer *p = pd->peer;
270 StoreEntry *e, *old_e;
271 char *url;
272 const cache_key *key;
273 request_t *req;
274 DigestFetchState *fetch = NULL;
275
276 pd->req_result = NULL;
277 pd->flags.requested = 1;
278
279 /* compute future request components */
280 if (p->digest_url)
281 url = xstrdup(p->digest_url);
282 else
283 url = internalRemoteUri(p->host, p->http_port,
284 "/squid-internal-periodic/", StoreDigestFileName);
285
286 key = storeKeyPublic(url, METHOD_GET);
287 debug(72, 2) ("peerDigestRequest: %s key: %s\n", url, storeKeyText(key));
288 req = urlParse(METHOD_GET, url);
289 assert(req);
290
291 /* add custom headers */
292 assert(!req->header.len);
293 httpHeaderPutStr(&req->header, HDR_ACCEPT, StoreDigestMimeStr);
294 httpHeaderPutStr(&req->header, HDR_ACCEPT, "text/html");
295 if (p->login)
296 xstrncpy(req->login, p->login, MAX_LOGIN_SZ);
297 /* create fetch state structure */
298 fetch = CBDATA_ALLOC(DigestFetchState, NULL);
299 fetch->request = requestLink(req);
300 fetch->pd = pd;
301 fetch->offset = 0;
302
303 /* update timestamps */
304 fetch->start_time = squid_curtime;
305 pd->times.requested = squid_curtime;
306 pd_last_req_time = squid_curtime;
307
308 req->flags.cachable = 1;
309 /* the rest is based on clientProcessExpired() */
310 req->flags.refresh = 1;
311 old_e = fetch->old_entry = storeGet(key);
312 if (old_e) {
313 debug(72, 5) ("peerDigestRequest: found old entry\n");
314 storeLockObject(old_e);
315 storeCreateMemObject(old_e, url, url);
316 fetch->old_sc = storeClientListAdd(old_e, fetch);
317 }
318 e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
319 assert(EBIT_TEST(e->flags, KEY_PRIVATE));
320 fetch->sc = storeClientListAdd(e, fetch);
321 /* set lastmod to trigger IMS request if possible */
322 if (old_e)
323 e->lastmod = old_e->lastmod;
324
325 /* push towards peer cache */
326 debug(72, 3) ("peerDigestRequest: forwarding to fwdStart...\n");
327 fwdStart(-1, e, req);
328 cbdataLock(fetch);
329 cbdataLock(fetch->pd);
330 storeClientCopy(fetch->sc, e, 0, 0, 4096, memAllocate(MEM_4K_BUF),
331 peerDigestFetchReply, fetch);
332 }
333
334 /* wait for full http headers to be received then parse them */
335 static void
336 peerDigestFetchReply(void *data, char *buf, ssize_t size)
337 {
338 DigestFetchState *fetch = data;
339 PeerDigest *pd = fetch->pd;
340 size_t hdr_size;
341 assert(pd && buf);
342 assert(!fetch->offset);
343
344 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
345 return;
346
347 if ((hdr_size = headersEnd(buf, size))) {
348 http_status status;
349 HttpReply *reply = fetch->entry->mem_obj->reply;
350 assert(reply);
351 httpReplyParse(reply, buf, hdr_size);
352 status = reply->sline.status;
353 debug(72, 3) ("peerDigestFetchReply: %s status: %d, expires: %d (%+d)\n",
354 strBuf(pd->host), status,
355 reply->expires, reply->expires - squid_curtime);
356
357 /* this "if" is based on clientHandleIMSReply() */
358 if (status == HTTP_NOT_MODIFIED) {
359 request_t *r = NULL;
360 /* our old entry is fine */
361 assert(fetch->old_entry);
362 if (!fetch->old_entry->mem_obj->request)
363 fetch->old_entry->mem_obj->request = r =
364 requestLink(fetch->entry->mem_obj->request);
365 assert(fetch->old_entry->mem_obj->request);
366 httpReplyUpdateOnNotModified(fetch->old_entry->mem_obj->reply, reply);
367 storeTimestampsSet(fetch->old_entry);
368 /* get rid of 304 reply */
369 storeUnregister(fetch->sc, fetch->entry, fetch);
370 storeUnlockObject(fetch->entry);
371 fetch->entry = fetch->old_entry;
372 fetch->old_entry = NULL;
373 /* preserve request -- we need its size to update counters */
374 /* requestUnlink(r); */
375 /* fetch->entry->mem_obj->request = NULL; */
376 } else if (status == HTTP_OK) {
377 /* get rid of old entry if any */
378 if (fetch->old_entry) {
379 debug(72, 3) ("peerDigestFetchReply: got new digest, releasing old one\n");
380 storeUnregister(fetch->old_sc, fetch->old_entry, fetch);
381 storeReleaseRequest(fetch->old_entry);
382 storeUnlockObject(fetch->old_entry);
383 fetch->old_entry = NULL;
384 }
385 } else {
386 /* some kind of a bug */
387 peerDigestFetchAbort(fetch, buf, httpStatusLineReason(&reply->sline));
388 return;
389 }
390 /* must have a ready-to-use store entry if we got here */
391 /* can we stay with the old in-memory digest? */
392 if (status == HTTP_NOT_MODIFIED && fetch->pd->cd)
393 peerDigestFetchStop(fetch, buf, "Not modified");
394 else
395 storeClientCopy(fetch->sc, fetch->entry, /* have to swap in */
396 0, 0, SM_PAGE_SIZE, buf, peerDigestSwapInHeaders, fetch);
397 } else {
398 /* need more data, do we have space? */
399 if (size >= SM_PAGE_SIZE)
400 peerDigestFetchAbort(fetch, buf, "reply header too big");
401 else
402 storeClientCopy(fetch->sc, fetch->entry, size, 0, SM_PAGE_SIZE, buf,
403 peerDigestFetchReply, fetch);
404 }
405 }
406
407 /* fetch headers from disk, pass on to SwapInCBlock */
408 static void
409 peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
410 {
411 DigestFetchState *fetch = data;
412 size_t hdr_size;
413
414 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
415 return;
416
417 assert(!fetch->offset);
418 if ((hdr_size = headersEnd(buf, size))) {
419 assert(fetch->entry->mem_obj->reply);
420 if (!fetch->entry->mem_obj->reply->sline.status)
421 httpReplyParse(fetch->entry->mem_obj->reply, buf, hdr_size);
422 if (fetch->entry->mem_obj->reply->sline.status != HTTP_OK) {
423 debug(72, 1) ("peerDigestSwapInHeaders: %s status %d got cached!\n",
424 strBuf(fetch->pd->host), fetch->entry->mem_obj->reply->sline.status);
425 peerDigestFetchAbort(fetch, buf, "internal status error");
426 return;
427 }
428 fetch->offset += hdr_size;
429 storeClientCopy(fetch->sc, fetch->entry, size, fetch->offset,
430 SM_PAGE_SIZE, buf,
431 peerDigestSwapInCBlock, fetch);
432 } else {
433 /* need more data, do we have space? */
434 if (size >= SM_PAGE_SIZE)
435 peerDigestFetchAbort(fetch, buf, "stored header too big");
436 else
437 storeClientCopy(fetch->sc, fetch->entry, size, 0, SM_PAGE_SIZE, buf,
438 peerDigestSwapInHeaders, fetch);
439 }
440 }
441
442 static void
443 peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
444 {
445 DigestFetchState *fetch = data;
446
447 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
448 return;
449
450 if (size >= StoreDigestCBlockSize) {
451 PeerDigest *pd = fetch->pd;
452 HttpReply *rep = fetch->entry->mem_obj->reply;
453 const int seen = fetch->offset + size;
454
455 assert(pd && rep);
456 if (peerDigestSetCBlock(pd, buf)) {
457 /* XXX: soon we will have variable header size */
458 fetch->offset += StoreDigestCBlockSize;
459 /* switch to CD buffer and fetch digest guts */
460 memFree(buf, MEM_4K_BUF);
461 buf = NULL;
462 assert(pd->cd->mask);
463 storeClientCopy(fetch->sc, fetch->entry,
464 seen,
465 fetch->offset,
466 pd->cd->mask_size,
467 pd->cd->mask,
468 peerDigestSwapInMask, fetch);
469 } else {
470 peerDigestFetchAbort(fetch, buf, "invalid digest cblock");
471 }
472 } else {
473 /* need more data, do we have space? */
474 if (size >= SM_PAGE_SIZE)
475 peerDigestFetchAbort(fetch, buf, "digest cblock too big");
476 else
477 storeClientCopy(fetch->sc, fetch->entry, size, 0, SM_PAGE_SIZE, buf,
478 peerDigestSwapInCBlock, fetch);
479 }
480 }
481
482 static void
483 peerDigestSwapInMask(void *data, char *buf, ssize_t size)
484 {
485 DigestFetchState *fetch = data;
486 PeerDigest *pd;
487
488 /* NOTE! buf points to the middle of pd->cd->mask! */
489 if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask"))
490 return;
491
492 pd = fetch->pd;
493 assert(pd->cd && pd->cd->mask);
494
495 fetch->offset += size;
496 fetch->mask_offset += size;
497 if (fetch->mask_offset >= pd->cd->mask_size) {
498 debug(72, 2) ("peerDigestSwapInMask: Done! Got %d, expected %d\n",
499 fetch->mask_offset, pd->cd->mask_size);
500 assert(fetch->mask_offset == pd->cd->mask_size);
501 assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask"));
502 } else {
503 const size_t buf_sz = pd->cd->mask_size - fetch->mask_offset;
504 assert(buf_sz > 0);
505 storeClientCopy(fetch->sc, fetch->entry,
506 fetch->offset,
507 fetch->offset,
508 buf_sz,
509 pd->cd->mask + fetch->mask_offset,
510 peerDigestSwapInMask, fetch);
511 }
512 }
513
514 static int
515 peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name)
516 {
517 PeerDigest *pd = NULL;
518 const char *host = "<unknown>"; /* peer host */
519 const char *reason = NULL; /* reason for completion */
520 const char *no_bug = NULL; /* successful completion if set */
521 const int fcb_valid = cbdataValid(fetch);
522 const int pdcb_valid = fcb_valid && cbdataValid(fetch->pd);
523 const int pcb_valid = pdcb_valid && cbdataValid(fetch->pd->peer);
524
525 /* test possible exiting conditions (the same for most steps!)
526 * cases marked with '?!' should not happen */
527
528 if (!reason) {
529 if (!fcb_valid)
530 reason = "fetch aborted?!";
531 else if (!(pd = fetch->pd))
532 reason = "peer digest disappeared?!";
533 #if DONT
534 else if (!cbdataValid(pd))
535 reason = "invalidated peer digest?!";
536 #endif
537 else
538 host = strBuf(pd->host);
539 }
540 debug(72, 6) ("%s: peer %s, offset: %d size: %d.\n",
541 step_name, host, fcb_valid ? fetch->offset : -1, size);
542
543 /* continue checking (with pd and host known and valid) */
544 if (!reason) {
545 if (!cbdataValid(pd->peer))
546 reason = "peer disappeared";
547 else if (size < 0)
548 reason = "swap failure";
549 else if (!fetch->entry)
550 reason = "swap aborted?!";
551 else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))
552 reason = "swap aborted";
553 }
554 /* continue checking (maybe-successful eof case) */
555 if (!reason && !size) {
556 if (!pd->cd)
557 reason = "null digest?!";
558 else if (fetch->mask_offset != pd->cd->mask_size)
559 reason = "premature end of digest?!";
560 else if (!peerDigestUseful(pd))
561 reason = "useless digest";
562 else
563 reason = no_bug = "success";
564 }
565 /* finish if we have a reason */
566 if (reason) {
567 const int level = strstr(reason, "?!") ? 1 : 3;
568 debug(72, level) ("%s: peer %s, exiting after '%s'\n",
569 step_name, host, reason);
570 peerDigestReqFinish(fetch, buf,
571 fcb_valid, pdcb_valid, pcb_valid, reason, !no_bug);
572 } else {
573 /* paranoid check */
574 assert(fcb_valid && pdcb_valid && pcb_valid);
575 }
576 return reason != NULL;
577 }
578
579 /* call this when all callback data is valid and fetch must be stopped but
580 * no error has occurred (e.g. we received 304 reply and reuse old digest) */
581 static void
582 peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason)
583 {
584 assert(reason);
585 debug(72, 2) ("peerDigestFetchStop: peer %s, reason: %s\n",
586 strBuf(fetch->pd->host), reason);
587 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 0);
588 }
589
590 /* call this when all callback data is valid but something bad happened */
591 static void
592 peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason)
593 {
594 assert(reason);
595 debug(72, 2) ("peerDigestFetchAbort: peer %s, reason: %s\n",
596 strBuf(fetch->pd->host), reason);
597 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);
598 }
599
600 /* complete the digest transfer, update stats, unlock/release everything */
601 static void
602 peerDigestReqFinish(DigestFetchState * fetch, char *buf,
603 int fcb_valid, int pdcb_valid, int pcb_valid,
604 const char *reason, int err)
605 {
606 assert(reason);
607
608 /* must go before peerDigestPDFinish */
609 if (pdcb_valid) {
610 fetch->pd->flags.requested = 0;
611 fetch->pd->req_result = reason;
612 }
613 /* schedule next check if peer is still out there */
614 if (pcb_valid) {
615 PeerDigest *pd = fetch->pd;
616 if (err) {
617 pd->times.retry_delay = peerDigestIncDelay(pd);
618 peerDigestSetCheck(pd, pd->times.retry_delay);
619 } else {
620 pd->times.retry_delay = 0;
621 peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
622 }
623 }
624 /* note: order is significant */
625 if (fcb_valid)
626 peerDigestFetchSetStats(fetch);
627 if (pdcb_valid)
628 peerDigestPDFinish(fetch, pcb_valid, err);
629 if (fcb_valid)
630 peerDigestFetchFinish(fetch, err);
631 if (buf)
632 memFree(buf, MEM_4K_BUF);
633 }
634
635
636 /* destroys digest if peer disappeared
637 * must be called only when fetch and pd cbdata are valid */
638 static void
639 peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err)
640 {
641 PeerDigest *pd = fetch->pd;
642 const char *host = strBuf(pd->host);
643
644 pd->times.received = squid_curtime;
645 pd->times.req_delay = fetch->resp_time;
646 kb_incr(&pd->stats.sent.kbytes, (size_t) fetch->sent.bytes);
647 kb_incr(&pd->stats.recv.kbytes, (size_t) fetch->recv.bytes);
648 pd->stats.sent.msgs += fetch->sent.msg;
649 pd->stats.recv.msgs += fetch->recv.msg;
650
651 if (err) {
652 debug(72, 1) ("%sdisabling (%s) digest from %s\n",
653 pcb_valid ? "temporary " : "",
654 pd->req_result, host);
655
656 if (pd->cd) {
657 cacheDigestDestroy(pd->cd);
658 pd->cd = NULL;
659 }
660 pd->flags.usable = 0;
661
662 if (!pcb_valid)
663 peerDigestNotePeerGone(pd);
664 } else {
665 assert(pcb_valid);
666
667 pd->flags.usable = 1;
668
669 /* XXX: ugly condition, but how? */
670 if (fetch->entry->store_status == STORE_OK)
671 debug(72, 2) ("re-used old digest from %s\n", host);
672 else
673 debug(72, 2) ("received valid digest from %s\n", host);
674 }
675 fetch->pd = NULL;
676 cbdataUnlock(pd);
677 }
678
679 /* free fetch state structures
680 * must be called only when fetch cbdata is valid */
681 static void
682 peerDigestFetchFinish(DigestFetchState * fetch, int err)
683 {
684 assert(fetch->entry && fetch->request);
685
686 if (fetch->old_entry) {
687 debug(72, 2) ("peerDigestFetchFinish: deleting old entry\n");
688 storeUnregister(fetch->sc, fetch->old_entry, fetch);
689 storeReleaseRequest(fetch->old_entry);
690 storeUnlockObject(fetch->old_entry);
691 fetch->old_entry = NULL;
692 }
693 /* update global stats */
694 kb_incr(&statCounter.cd.kbytes_sent, (size_t) fetch->sent.bytes);
695 kb_incr(&statCounter.cd.kbytes_recv, (size_t) fetch->recv.bytes);
696 statCounter.cd.msgs_sent += fetch->sent.msg;
697 statCounter.cd.msgs_recv += fetch->recv.msg;
698
699 /* unlock everything */
700 storeUnregister(fetch->sc, fetch->entry, fetch);
701 storeUnlockObject(fetch->entry);
702 requestUnlink(fetch->request);
703 fetch->entry = NULL;
704 fetch->request = NULL;
705 assert(fetch->pd == NULL);
706 cbdataUnlock(fetch);
707 cbdataFree(fetch);
708 }
709
710 /* calculate fetch stats after completion */
711 static void
712 peerDigestFetchSetStats(DigestFetchState * fetch)
713 {
714 MemObject *mem;
715 assert(fetch->entry && fetch->request);
716
717 mem = fetch->entry->mem_obj;
718 assert(mem);
719
720 /* XXX: outgoing numbers are not precise */
721 /* XXX: we must distinguish between 304 hits and misses here */
722 fetch->sent.bytes = httpRequestPrefixLen(fetch->request);
723 fetch->recv.bytes = fetch->entry->store_status == STORE_PENDING ?
724 mem->inmem_hi : mem->object_sz;
725 fetch->sent.msg = fetch->recv.msg = 1;
726 fetch->expires = fetch->entry->expires;
727 fetch->resp_time = squid_curtime - fetch->start_time;
728
729 debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secs\n",
730 fetch->recv.bytes, fetch->resp_time);
731 debug(72, 3) ("peerDigestFetchFinish: expires: %d (%+d), lmt: %d (%+d)\n",
732 fetch->expires, fetch->expires - squid_curtime,
733 fetch->entry->lastmod, fetch->entry->lastmod - squid_curtime);
734 }
735
736
737 static int
738 peerDigestSetCBlock(PeerDigest * pd, const char *buf)
739 {
740 StoreDigestCBlock cblock;
741 int freed_size = 0;
742 const char *host = strBuf(pd->host);
743
744 xmemcpy(&cblock, buf, sizeof(cblock));
745 /* network -> host conversions */
746 cblock.ver.current = ntohs(cblock.ver.current);
747 cblock.ver.required = ntohs(cblock.ver.required);
748 cblock.capacity = ntohl(cblock.capacity);
749 cblock.count = ntohl(cblock.count);
750 cblock.del_count = ntohl(cblock.del_count);
751 cblock.mask_size = ntohl(cblock.mask_size);
752 debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)\n",
753 host, (int) cblock.ver.current, (int) cblock.ver.required);
754 debug(72, 2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n",
755 cblock.mask_size, cblock.count,
756 xpercentInt(cblock.count, cblock.capacity));
757 /* check version requirements (both ways) */
758 if (cblock.ver.required > CacheDigestVer.current) {
759 debug(72, 1) ("%s digest requires version %d; have: %d\n",
760 host, cblock.ver.required, CacheDigestVer.current);
761 return 0;
762 }
763 if (cblock.ver.current < CacheDigestVer.required) {
764 debug(72, 1) ("%s digest is version %d; we require: %d\n",
765 host, cblock.ver.current, CacheDigestVer.required);
766 return 0;
767 }
768 /* check consistency */
769 if (cblock.ver.required > cblock.ver.current ||
770 cblock.mask_size <= 0 || cblock.capacity <= 0 ||
771 cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
772 debug(72, 0) ("%s digest cblock is corrupted.\n", host);
773 return 0;
774 }
775 /* check consistency further */
776 if (cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
777 debug(72, 0) ("%s digest cblock is corrupted (mask size mismatch: %d ? %d).\n",
778 host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry));
779 return 0;
780 }
781 /* there are some things we cannot do yet */
782 if (cblock.hash_func_count != CacheDigestHashFuncCount) {
783 debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.\n",
784 host, cblock.hash_func_count, CacheDigestHashFuncCount);
785 return 0;
786 }
787 /*
788 * no cblock bugs below this point
789 */
790 /* check size changes */
791 if (pd->cd && cblock.mask_size != pd->cd->mask_size) {
792 debug(72, 2) ("%s digest changed size: %d -> %d\n",
793 host, cblock.mask_size, pd->cd->mask_size);
794 freed_size = pd->cd->mask_size;
795 cacheDigestDestroy(pd->cd);
796 pd->cd = NULL;
797 }
798 if (!pd->cd) {
799 debug(72, 2) ("creating %s digest; size: %d (%+d) bytes\n",
800 host, cblock.mask_size, (int) (cblock.mask_size - freed_size));
801 pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);
802 if (cblock.mask_size >= freed_size)
803 kb_incr(&statCounter.cd.memory, cblock.mask_size - freed_size);
804 }
805 assert(pd->cd);
806 /* these assignments leave us in an inconsistent state until we finish reading the digest */
807 pd->cd->count = cblock.count;
808 pd->cd->del_count = cblock.del_count;
809 return 1;
810 }
811
812 static int
813 peerDigestUseful(const PeerDigest * pd)
814 {
815 /* TODO: we should calculate the prob of a false hit instead of bit util */
816 const int bit_util = cacheDigestBitUtil(pd->cd);
817 if (bit_util > 65) {
818 debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).\n",
819 strBuf(pd->host), bit_util);
820 return 0;
821 }
822 return 1;
823 }
824
825 static int
826 saneDiff(time_t diff)
827 {
828 return abs(diff) > squid_curtime / 2 ? 0 : diff;
829 }
830
831 void
832 peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e)
833 {
834 #define f2s(flag) (pd->flags.flag ? "yes" : "no")
835 #define appendTime(tm) storeAppendPrintf(e, "%s\t %10d\t %+d\t %+d\n", \
836 ""#tm, pd->times.tm, \
837 saneDiff(pd->times.tm - squid_curtime), \
838 saneDiff(pd->times.tm - pd->times.initialized))
839
840 const char *host = pd ? strBuf(pd->host) : NULL;
841 assert(pd);
842
843 storeAppendPrintf(e, "\npeer digest from %s\n", host);
844
845 cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
846
847 storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");
848 appendTime(initialized);
849 appendTime(needed);
850 appendTime(requested);
851 appendTime(received);
852 appendTime(next_check);
853
854 storeAppendPrintf(e, "peer digest state:\n");
855 storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",
856 f2s(needed), f2s(usable), f2s(requested));
857 storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",
858 pd->times.retry_delay);
859 storeAppendPrintf(e, "\tlast request response time: %d secs\n",
860 pd->times.req_delay);
861 storeAppendPrintf(e, "\tlast request result: %s\n",
862 pd->req_result ? pd->req_result : "(none)");
863
864 storeAppendPrintf(e, "\npeer digest traffic:\n");
865 storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",
866 pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
867 storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n",
868 pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
869
870 storeAppendPrintf(e, "\npeer digest structure:\n");
871 if (pd->cd)
872 cacheDigestReport(pd->cd, host, e);
873 else
874 storeAppendPrintf(e, "\tno in-memory copy\n");
875 }
876
877 #endif