]> git.ipfire.org Git - thirdparty/squid.git/blame - src/peer_digest.cc
An ICAP server is allowed to list multiple methods in an options
[thirdparty/squid.git] / src / peer_digest.cc
CommitLineData
9b7de833 1
2/*
e11fe29a 3 * $Id: peer_digest.cc,v 1.108 2006/01/19 22:10:55 wessels Exp $
9b7de833 4 *
5 * DEBUG: section 72 Peer Digest Routines
6 * AUTHOR: Alex Rousskov
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
e25c139f 9 * ----------------------------------------------------------
9b7de833 10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
9b7de833 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
cbdec147 32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
e25c139f 33 *
9b7de833 34 */
35
36#include "squid.h"
6cfa8966 37#if USE_CACHE_DIGESTS
7a2e5bb5 38
528b2c61 39#include "Store.h"
40#include "HttpRequest.h"
41#include "HttpReply.h"
42#include "MemObject.h"
598465f1 43#include "StoreClient.h"
2225feb6 44#include "forward.h"
598465f1 45
9b7de833 46/* local types */
47
48/* local prototypes */
e13ee7ad 49static time_t peerDigestIncDelay(const PeerDigest * pd);
50static time_t peerDigestNewDelay(const StoreEntry * e);
51static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
eb16313f 52static void peerDigestClean(PeerDigest *);
e13ee7ad 53static EVH peerDigestCheck;
54static void peerDigestRequest(PeerDigest * pd);
add2192d 55static STCB peerDigestHandleReply;
3dfaa3d2 56static int peerDigestFetchReply(void *, char *, ssize_t);
1f140227 57int peerDigestSwapInHeaders(void *, char *, ssize_t);
58int peerDigestSwapInCBlock(void *, char *, ssize_t);
59int peerDigestSwapInMask(void *, char *, ssize_t);
4b4cd312 60static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
551e60a9 61static void peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason);
e13ee7ad 62static void peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason);
63static void peerDigestReqFinish(DigestFetchState * fetch, char *buf, int, int, int, const char *reason, int err);
64static void peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err);
65static void peerDigestFetchFinish(DigestFetchState * fetch, int err);
66static void peerDigestFetchSetStats(DigestFetchState * fetch);
67static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
68static int peerDigestUseful(const PeerDigest * pd);
395b813e 69
9b7de833 70
71/* local constants */
9d486b43 72
9b7de833 73#define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
74
e13ee7ad 75/* min interval for requesting digests from a given peer */
76static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
77/* min interval for requesting digests (cumulative request stream) */
78static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
bd890734 79
80/* local vars */
9d486b43 81
8a6218c6 82static time_t pd_last_req_time = 0; /* last call to Check */
9b7de833 83
e13ee7ad 84/* initialize peer digest */
85static void
8a6218c6 86peerDigestInit(PeerDigest * pd, peer * p)
9b7de833 87{
e13ee7ad 88 assert(pd && p);
89
90 memset(pd, 0, sizeof(*pd));
91 pd->peer = p;
92 /* if peer disappears, we will know it's name */
528b2c61 93 pd->host = p->host;
e13ee7ad 94
95 pd->times.initialized = squid_curtime;
9b7de833 96}
97
4b4cd312 98static void
8a6218c6 99peerDigestClean(PeerDigest * pd)
9b7de833 100{
e13ee7ad 101 assert(pd);
62e76326 102
e13ee7ad 103 if (pd->cd)
62e76326 104 cacheDigestDestroy(pd->cd);
105
528b2c61 106 pd->host.clean();
9b7de833 107}
108
0353e724 109CBDATA_CLASS_INIT(PeerDigest);
110
111void *
112PeerDigest::operator new (size_t)
113{
114 CBDATA_INIT_TYPE(PeerDigest);
115 PeerDigest *result = cbdataAlloc(PeerDigest);
116 return result;
117}
118
119void
120PeerDigest::operator delete (void *address)
121{
122 PeerDigest *t = static_cast<PeerDigest *>(address);
123 cbdataFree(t);
124}
125
e13ee7ad 126/* allocate new peer digest, call Init, and lock everything */
127PeerDigest *
8a6218c6 128peerDigestCreate(peer * p)
e13ee7ad 129{
130 PeerDigest *pd;
8a6218c6 131 assert(p);
e13ee7ad 132
0353e724 133 pd = new PeerDigest;
e13ee7ad 134 peerDigestInit(pd, p);
e13ee7ad 135
fa80a8ef 136 /* XXX This does not look right, and the same thing again in the caller */
137 return cbdataReference(pd);
e13ee7ad 138}
139
140/* call Clean and free/unlock everything */
2d72d4fd 141static void
8a6218c6 142peerDigestDestroy(PeerDigest * pd)
e13ee7ad 143{
8abf232c 144 void *p;
e13ee7ad 145 assert(pd);
8abf232c 146 void * peerTmp = pd->peer;
e13ee7ad 147
fa80a8ef 148 /* inform peer (if any) that we are gone */
62e76326 149
8abf232c 150 if (cbdataReferenceValidDone(peerTmp, &p))
151 peerNoteDigestGone((peer *)p);
e13ee7ad 152
153 peerDigestClean(pd);
62e76326 154
00d77d6b 155 delete pd;
e13ee7ad 156}
157
158/* called by peer to indicate that somebody actually needs this digest */
159void
8a6218c6 160peerDigestNeeded(PeerDigest * pd)
e13ee7ad 161{
162 assert(pd);
163 assert(!pd->flags.needed);
164 assert(!pd->cd);
165
166 pd->flags.needed = 1;
167 pd->times.needed = squid_curtime;
8a6218c6 168 peerDigestSetCheck(pd, 0); /* check asap */
e13ee7ad 169}
170
171/* currently we do not have a reason to disable without destroying */
8a6218c6 172#if FUTURE_CODE
395b813e 173/* disables peer for good */
174static void
8a6218c6 175peerDigestDisable(PeerDigest * pd)
395b813e 176{
8a6218c6 177 debug(72, 2) ("peerDigestDisable: peer %s disabled for good\n",
62e76326 178 pd->host.buf());
e13ee7ad 179 pd->times.disabled = squid_curtime;
8a6218c6 180 pd->times.next_check = -1; /* never */
e13ee7ad 181 pd->flags.usable = 0;
182
183 if (pd->cd) {
62e76326 184 cacheDigestDestroy(pd->cd);
185 pd->cd = NULL;
e13ee7ad 186 }
62e76326 187
e13ee7ad 188 /* we do not destroy the pd itself to preserve its "history" and stats */
395b813e 189}
62e76326 190
e13ee7ad 191#endif
395b813e 192
e13ee7ad 193/* increment retry delay [after an unsuccessful attempt] */
395b813e 194static time_t
8a6218c6 195peerDigestIncDelay(const PeerDigest * pd)
395b813e 196{
e13ee7ad 197 assert(pd);
198 return pd->times.retry_delay > 0 ?
62e76326 199 2 * pd->times.retry_delay : /* exponential backoff */
200 PeerDigestReqMinGap; /* minimal delay */
395b813e 201}
202
62e76326 203/* artificially increases Expires: setting to avoid race conditions
e13ee7ad 204 * returns the delay till that [increased] expiration time */
00485c29 205static time_t
e13ee7ad 206peerDigestNewDelay(const StoreEntry * e)
00485c29 207{
e13ee7ad 208 assert(e);
62e76326 209
00485c29 210 if (e->expires > 0)
62e76326 211 return e->expires + PeerDigestReqMinGap - squid_curtime;
212
e13ee7ad 213 return PeerDigestReqMinGap;
00485c29 214}
215
e13ee7ad 216/* registers next digest verification */
395b813e 217static void
e13ee7ad 218peerDigestSetCheck(PeerDigest * pd, time_t delay)
395b813e 219{
e13ee7ad 220 eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
221 pd->times.next_check = squid_curtime + delay;
222 debug(72, 3) ("peerDigestSetCheck: will check peer %s in %d secs\n",
62e76326 223 pd->host.buf(), (int) delay);
e13ee7ad 224}
225
5385c86a 226/*
227 * called when peer is about to disappear or have already disappeared
228 */
e13ee7ad 229void
8a6218c6 230peerDigestNotePeerGone(PeerDigest * pd)
231{
e13ee7ad 232 if (pd->flags.requested) {
62e76326 233 debug(72, 2) ("peerDigest: peer %s gone, will destroy after fetch.\n",
234 pd->host.buf());
235 /* do nothing now, the fetching chain will notice and take action */
395b813e 236 } else {
62e76326 237 debug(72, 2) ("peerDigest: peer %s is gone, destroying now.\n",
238 pd->host.buf());
239 peerDigestDestroy(pd);
395b813e 240 }
241}
242
e13ee7ad 243/* callback for eventAdd() (with peer digest locked)
244 * request new digest if our copy is too old or if we lack one;
245 * schedule next check otherwise */
9b7de833 246static void
e13ee7ad 247peerDigestCheck(void *data)
9b7de833 248{
e6ccf245 249 PeerDigest *pd = (PeerDigest *)data;
e13ee7ad 250 time_t req_time;
251
e13ee7ad 252 assert(!pd->flags.requested);
5d9bb360 253
8a6218c6 254 pd->times.next_check = 0; /* unknown */
e13ee7ad 255
fa80a8ef 256 if (!cbdataReferenceValid(pd->peer)) {
62e76326 257 peerDigestNotePeerGone(pd);
258 return;
9b7de833 259 }
62e76326 260
e13ee7ad 261 debug(72, 3) ("peerDigestCheck: peer %s:%d\n", pd->peer->host, pd->peer->http_port);
38650cc8 262 debug(72, 3) ("peerDigestCheck: time: %ld, last received: %ld (%+d)\n",
62e76326 263 (long int) squid_curtime, (long int) pd->times.received, (int) (squid_curtime - pd->times.received));
e13ee7ad 264
265 /* decide when we should send the request:
266 * request now unless too close to other requests */
267 req_time = squid_curtime;
268
269 /* per-peer limit */
62e76326 270
e13ee7ad 271 if (req_time - pd->times.received < PeerDigestReqMinGap) {
62e76326 272 debug(72, 2) ("peerDigestCheck: %s, avoiding close peer requests (%d < %d secs).\n",
273 pd->host.buf(), (int) (req_time - pd->times.received),
274 (int) PeerDigestReqMinGap);
275 req_time = pd->times.received + PeerDigestReqMinGap;
bd890734 276 }
62e76326 277
e13ee7ad 278 /* global limit */
279 if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
62e76326 280 debug(72, 2) ("peerDigestCheck: %s, avoiding close requests (%d < %d secs).\n",
281 pd->host.buf(), (int) (req_time - pd_last_req_time),
282 (int) GlobDigestReqMinGap);
283 req_time = pd_last_req_time + GlobDigestReqMinGap;
9b7de833 284 }
62e76326 285
e13ee7ad 286 if (req_time <= squid_curtime)
62e76326 287 peerDigestRequest(pd); /* will set pd->flags.requested */
e13ee7ad 288 else
62e76326 289 peerDigestSetCheck(pd, req_time - squid_curtime);
9b7de833 290}
291
2812146b 292CBDATA_TYPE(DigestFetchState);
293
e13ee7ad 294/* ask store for a digest */
9b7de833 295static void
e13ee7ad 296peerDigestRequest(PeerDigest * pd)
9b7de833 297{
e13ee7ad 298 peer *p = pd->peer;
9b7de833 299 StoreEntry *e, *old_e;
300 char *url;
9d486b43 301 const cache_key *key;
190154cf 302 HttpRequest *req;
9b7de833 303 DigestFetchState *fetch = NULL;
528b2c61 304 StoreIOBuffer tempBuffer;
e13ee7ad 305
306 pd->req_result = NULL;
307 pd->flags.requested = 1;
308
9b7de833 309 /* compute future request components */
62e76326 310
7e3ce7b9 311 if (p->digest_url)
62e76326 312 url = xstrdup(p->digest_url);
7e3ce7b9 313 else
62e76326 314 url = internalRemoteUri(p->host, p->http_port,
315 "/squid-internal-periodic/", StoreDigestFileName);
7e3ce7b9 316
50a97cee 317 req = urlParse(METHOD_GET, url);
62e76326 318
e13ee7ad 319 assert(req);
62e76326 320
f66a9ef4 321 key = storeKeyPublicByRequest(req);
62e76326 322
f66a9ef4 323 debug(72, 2) ("peerDigestRequest: %s key: %s\n", url, storeKeyText(key));
e13ee7ad 324
9b7de833 325 /* add custom headers */
2246b732 326 assert(!req->header.len);
62e76326 327
4820f62b 328 httpHeaderPutStr(&req->header, HDR_ACCEPT, StoreDigestMimeStr);
62e76326 329
4820f62b 330 httpHeaderPutStr(&req->header, HDR_ACCEPT, "text/html");
62e76326 331
9bc73deb 332 if (p->login)
62e76326 333 xstrncpy(req->login, p->login, MAX_LOGIN_SZ);
334
9b7de833 335 /* create fetch state structure */
2812146b 336 CBDATA_INIT_TYPE(DigestFetchState);
62e76326 337
72711e31 338 fetch = cbdataAlloc(DigestFetchState);
62e76326 339
903c39e4 340 fetch->request = requestLink(req);
62e76326 341
fa80a8ef 342 fetch->pd = cbdataReference(pd);
62e76326 343
e13ee7ad 344 fetch->offset = 0;
62e76326 345
add2192d 346 fetch->state = DIGEST_READ_REPLY;
e13ee7ad 347
348 /* update timestamps */
9b7de833 349 fetch->start_time = squid_curtime;
62e76326 350
e13ee7ad 351 pd->times.requested = squid_curtime;
62e76326 352
e13ee7ad 353 pd_last_req_time = squid_curtime;
354
92695e5e 355 req->flags.cachable = 1;
62e76326 356
9b7de833 357 /* the rest is based on clientProcessExpired() */
92695e5e 358 req->flags.refresh = 1;
62e76326 359
c8f4eac4 360 old_e = fetch->old_entry = Store::Root().get(key);
62e76326 361
9b7de833 362 if (old_e) {
62e76326 363 debug(72, 5) ("peerDigestRequest: found old entry\n");
364 storeLockObject(old_e);
365 storeCreateMemObject(old_e, url, url);
366 fetch->old_sc = storeClientListAdd(old_e, fetch);
9b7de833 367 }
62e76326 368
9b7de833 369 e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
e13ee7ad 370 assert(EBIT_TEST(e->flags, KEY_PRIVATE));
06d2839d 371 fetch->sc = storeClientListAdd(e, fetch);
9b7de833 372 /* set lastmod to trigger IMS request if possible */
62e76326 373
9b7de833 374 if (old_e)
62e76326 375 e->lastmod = old_e->lastmod;
e13ee7ad 376
9b7de833 377 /* push towards peer cache */
e13ee7ad 378 debug(72, 3) ("peerDigestRequest: forwarding to fwdStart...\n");
62e76326 379
2225feb6 380 FwdState::fwdStart(-1, e, req);
62e76326 381
598465f1 382 tempBuffer.offset = 0;
62e76326 383
f95db407 384 tempBuffer.length = SM_PAGE_SIZE;
62e76326 385
598465f1 386 tempBuffer.data = fetch->buf;
62e76326 387
598465f1 388 storeClientCopy(fetch->sc, e, tempBuffer,
62e76326 389 peerDigestHandleReply, fetch);
9b7de833 390}
391
add2192d 392
393/* Handle the data copying .. */
394
395/*
396 * This routine handles the copy data and then redirects the
397 * copy to a bunch of subfunctions depending upon the copy state.
398 * It also tracks the buffer offset and "seen", since I'm actually
399 * not interested in rewriting everything to suit my little idea.
400 */
9b7de833 401static void
598465f1 402peerDigestHandleReply(void *data, StoreIOBuffer recievedData)
add2192d 403{
e6ccf245 404 DigestFetchState *fetch = (DigestFetchState *)data;
add2192d 405 int retsize = -1;
406 digest_read_state_t prevstate;
407 int newsize;
408
e4a67a80 409 assert(fetch->pd && recievedData.data);
598465f1 410 /* The existing code assumes that the recieved pointer is
411 * where we asked the data to be put
412 */
413 assert(fetch->buf + fetch->bufofs == recievedData.data);
add2192d 414
415 /* Update the buffer size */
598465f1 416 fetch->bufofs += recievedData.length;
add2192d 417
418 assert(fetch->bufofs <= SM_PAGE_SIZE);
419
420 /* If we've fetched enough, return */
62e76326 421
add2192d 422 if (peerDigestFetchedEnough(fetch, fetch->buf, fetch->bufofs, "peerDigestHandleReply"))
62e76326 423 return;
add2192d 424
425 /* Call the right function based on the state */
426 /* (Those functions will update the state if needed) */
fa80a8ef 427
e2848e94 428 /* Give us a temporary reference. Some of the calls we make may
429 * try to destroy the fetch structure, and we like to know if they
430 * do
bac6d4bd 431 */
e2848e94 432 fetch = cbdataReference(fetch);
add2192d 433
434 /* Repeat this loop until we're out of data OR the state changes */
435 /* (So keep going if the state has changed and we still have data */
436 do {
62e76326 437 prevstate = fetch->state;
438
439 switch (fetch->state) {
440
441 case DIGEST_READ_REPLY:
442 retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs);
443 break;
444
445 case DIGEST_READ_HEADERS:
446 retsize = peerDigestSwapInHeaders(fetch, fetch->buf, fetch->bufofs);
447 break;
448
449 case DIGEST_READ_CBLOCK:
450 retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs);
451 break;
452
453 case DIGEST_READ_MASK:
454 retsize = peerDigestSwapInMask(fetch, fetch->buf, fetch->bufofs);
455 break;
456
457 case DIGEST_READ_NONE:
458 break;
459
460 case DIGEST_READ_DONE:
461 goto finish;
462 break;
463
464 default:
465 fatal("Bad digest transfer mode!\n");
466 }
467
468 if (retsize < 0)
469 goto finish;
470
471 /*
472 * The returned size indicates how much of the buffer was read -
473 * so move the remainder of the buffer to the beginning
474 * and update the bufofs / bufsize
475 */
476 newsize = fetch->bufofs - retsize;
477
478 xmemmove(fetch->buf, fetch->buf + retsize, fetch->bufofs - newsize);
479
480 fetch->bufofs = newsize;
add2192d 481
e2848e94 482 } while (cbdataReferenceValid(fetch) && prevstate != fetch->state && fetch->bufofs > 0);
add2192d 483
484 /* Update the copy offset */
598465f1 485 fetch->offset += recievedData.length;
add2192d 486
487 /* Schedule another copy */
fa80a8ef 488 if (cbdataReferenceValid(fetch)) {
62e76326 489 StoreIOBuffer tempBuffer;
490 tempBuffer.offset = fetch->offset;
491 tempBuffer.length = SM_PAGE_SIZE - fetch->bufofs;
492 tempBuffer.data = fetch->buf + fetch->bufofs;
493 storeClientCopy(fetch->sc, fetch->entry, tempBuffer,
494 peerDigestHandleReply, fetch);
add2192d 495 }
62e76326 496
497finish:
e2848e94 498 /* Get rid of our reference, we've finished with it for now */
499 cbdataReferenceDone(fetch);
add2192d 500}
501
502
503
504/* wait for full http headers to be received then parse them */
505/*
506 * This routine handles parsing the reply line.
507 * If the reply line indicates an OK, the same data is thrown
508 * to SwapInHeaders(). If the reply line is a NOT_MODIFIED,
509 * we simply stop parsing.
510 */
511static int
9b7de833 512peerDigestFetchReply(void *data, char *buf, ssize_t size)
513{
e6ccf245 514 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 515 PeerDigest *pd = fetch->pd;
9bc73deb 516 size_t hdr_size;
e13ee7ad 517 assert(pd && buf);
9b7de833 518 assert(!fetch->offset);
e13ee7ad 519
add2192d 520 assert(fetch->state == DIGEST_READ_REPLY);
62e76326 521
9b7de833 522 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
62e76326 523 return -1;
e13ee7ad 524
9bc73deb 525 if ((hdr_size = headersEnd(buf, size))) {
62e76326 526 http_status status;
527 HttpReply const *reply = fetch->entry->getReply();
528 assert(reply);
529 assert (reply->sline.status != 0);
530 status = reply->sline.status;
531 debug(72, 3) ("peerDigestFetchReply: %s status: %d, expires: %ld (%+d)\n",
532 pd->host.buf(), status,
533 (long int) reply->expires, (int) (reply->expires - squid_curtime));
534
535 /* this "if" is based on clientHandleIMSReply() */
536
537 if (status == HTTP_NOT_MODIFIED) {
190154cf 538 HttpRequest *r = NULL;
62e76326 539 /* our old entry is fine */
540 assert(fetch->old_entry);
541
542 if (!fetch->old_entry->mem_obj->request)
543 fetch->old_entry->mem_obj->request = r =
544 requestLink(fetch->entry->mem_obj->request);
545
546 assert(fetch->old_entry->mem_obj->request);
547
07947ad8 548 HttpReply *old_rep = (HttpReply *) fetch->old_entry->getReply();
549
06a5ae20 550 old_rep->updateOnNotModified(reply);
62e76326 551
552 storeTimestampsSet(fetch->old_entry);
553
554 /* get rid of 304 reply */
555 storeUnregister(fetch->sc, fetch->entry, fetch);
556
557 storeUnlockObject(fetch->entry);
558
559 fetch->entry = fetch->old_entry;
560
561 fetch->old_entry = NULL;
562
563 /* preserve request -- we need its size to update counters */
564 /* requestUnlink(r); */
565 /* fetch->entry->mem_obj->request = NULL; */
566 } else if (status == HTTP_OK) {
567 /* get rid of old entry if any */
568
569 if (fetch->old_entry) {
570 debug(72, 3) ("peerDigestFetchReply: got new digest, releasing old one\n");
571 storeUnregister(fetch->old_sc, fetch->old_entry, fetch);
572 storeReleaseRequest(fetch->old_entry);
573 storeUnlockObject(fetch->old_entry);
574 fetch->old_entry = NULL;
575 }
576 } else {
577 /* some kind of a bug */
578 peerDigestFetchAbort(fetch, buf, httpStatusLineReason(&reply->sline));
579 return -1; /* XXX -1 will abort stuff in ReadReply! */
580 }
581
582 /* must have a ready-to-use store entry if we got here */
583 /* can we stay with the old in-memory digest? */
584 if (status == HTTP_NOT_MODIFIED && fetch->pd->cd) {
585 peerDigestFetchStop(fetch, buf, "Not modified");
586 fetch->state = DIGEST_READ_DONE;
587 } else {
588 fetch->state = DIGEST_READ_HEADERS;
589 }
9b7de833 590 } else {
62e76326 591 /* need more data, do we have space? */
592
593 if (size >= SM_PAGE_SIZE)
594 peerDigestFetchAbort(fetch, buf, "reply header too big");
9b7de833 595 }
add2192d 596
597 /* We don't want to actually ack that we've handled anything,
598 * otherwise SwapInHeaders() won't get the reply line .. */
599 return 0;
9b7de833 600}
601
602/* fetch headers from disk, pass on to SwapInCBlock */
1f140227 603int
9b7de833 604peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
605{
e6ccf245 606 DigestFetchState *fetch = (DigestFetchState *)data;
9b7de833 607 size_t hdr_size;
e13ee7ad 608
add2192d 609 assert(fetch->state == DIGEST_READ_HEADERS);
62e76326 610
9b7de833 611 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
62e76326 612 return -1;
e13ee7ad 613
9d486b43 614 assert(!fetch->offset);
62e76326 615
9b7de833 616 if ((hdr_size = headersEnd(buf, size))) {
62e76326 617 assert(fetch->entry->getReply());
618 assert (fetch->entry->getReply()->sline.status != 0);
619
620 if (fetch->entry->getReply()->sline.status != HTTP_OK) {
621 debug(72, 1) ("peerDigestSwapInHeaders: %s status %d got cached!\n",
622 fetch->pd->host.buf(), fetch->entry->getReply()->sline.status);
623 peerDigestFetchAbort(fetch, buf, "internal status error");
624 return -1;
625 }
626
627 fetch->state = DIGEST_READ_CBLOCK;
628 return hdr_size; /* Say how much data we read */
9b7de833 629 } else {
62e76326 630 /* need more data, do we have space? */
631
632 if (size >= SM_PAGE_SIZE) {
633 peerDigestFetchAbort(fetch, buf, "stored header too big");
634 return -1;
635 } else {
636 return 0; /* We need to read more to parse .. */
637 }
9b7de833 638 }
62e76326 639
add2192d 640 fatal("peerDigestSwapInHeaders() - shouldn't get here!\n");
9b7de833 641}
642
1f140227 643int
9b7de833 644peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
645{
e6ccf245 646 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 647
add2192d 648 assert(fetch->state == DIGEST_READ_CBLOCK);
62e76326 649
9b7de833 650 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
62e76326 651 return -1;
e13ee7ad 652
e6ccf245 653 if (size >= (ssize_t)StoreDigestCBlockSize) {
62e76326 654 PeerDigest *pd = fetch->pd;
62e76326 655
e4a67a80 656 assert(pd && fetch->entry->getReply());
62e76326 657
658 if (peerDigestSetCBlock(pd, buf)) {
659 /* XXX: soon we will have variable header size */
660 /* switch to CD buffer and fetch digest guts */
661 buf = NULL;
662 assert(pd->cd->mask);
663 fetch->state = DIGEST_READ_MASK;
664 return StoreDigestCBlockSize;
665 } else {
666 peerDigestFetchAbort(fetch, buf, "invalid digest cblock");
667 return -1;
668 }
9b7de833 669 } else {
62e76326 670 /* need more data, do we have space? */
671
672 if (size >= SM_PAGE_SIZE) {
673 peerDigestFetchAbort(fetch, buf, "digest cblock too big");
674 return -1;
675 } else {
676 return 0; /* We need more data */
677 }
9b7de833 678 }
62e76326 679
add2192d 680 fatal("peerDigestSwapInCBlock(): shouldn't get here!\n");
9b7de833 681}
682
1f140227 683int
9b7de833 684peerDigestSwapInMask(void *data, char *buf, ssize_t size)
685{
e6ccf245 686 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 687 PeerDigest *pd;
688
e13ee7ad 689 pd = fetch->pd;
690 assert(pd->cd && pd->cd->mask);
9d486b43 691
add2192d 692 /*
693 * NOTENOTENOTENOTENOTE: buf doesn't point to pd->cd->mask anymore!
694 * we need to do the copy ourselves!
695 */
696 xmemcpy(pd->cd->mask + fetch->mask_offset, buf, size);
697
698 /* NOTE! buf points to the middle of pd->cd->mask! */
62e76326 699
add2192d 700 if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask"))
62e76326 701 return -1;
add2192d 702
da407def 703 fetch->mask_offset += size;
62e76326 704
e6ccf245 705 if (fetch->mask_offset >= (off_t)pd->cd->mask_size) {
e4049756 706 debugs(72, 2, "peerDigestSwapInMask: Done! Got " <<
707 fetch->mask_offset << ", expected " << pd->cd->mask_size);
62e76326 708 assert(fetch->mask_offset == (off_t)pd->cd->mask_size);
709 assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask"));
710 return -1; /* XXX! */
e13ee7ad 711 } else {
62e76326 712 /* We always read everything, so return so */
713 return size;
9b7de833 714 }
62e76326 715
add2192d 716 fatal("peerDigestSwapInMask(): shouldn't get here!\n");
9b7de833 717}
718
719static int
4b4cd312 720peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name)
9b7de833 721{
e13ee7ad 722 PeerDigest *pd = NULL;
8a6218c6 723 const char *host = "<unknown>"; /* peer host */
724 const char *reason = NULL; /* reason for completion */
725 const char *no_bug = NULL; /* successful completion if set */
e2848e94 726 const int pdcb_valid = cbdataReferenceValid(fetch->pd);
727 const int pcb_valid = cbdataReferenceValid(fetch->pd->peer);
e13ee7ad 728
729 /* test possible exiting conditions (the same for most steps!)
730 * cases marked with '?!' should not happen */
731
732 if (!reason) {
62e76326 733 if (!(pd = fetch->pd))
734 reason = "peer digest disappeared?!";
735
bac6d4bd 736#if DONT /* WHY NOT? /HNO */
62e76326 737
738 else if (!cbdataReferenceValid(pd))
739 reason = "invalidated peer digest?!";
740
5385c86a 741#endif
62e76326 742
743 else
744 host = pd->host.buf();
e13ee7ad 745 }
62e76326 746
e4049756 747 debugs(72, 6, step_name << ": peer " << host << ", offset: " <<
748 fetch->offset << " size: " << size << ".");
e13ee7ad 749
750 /* continue checking (with pd and host known and valid) */
62e76326 751
e13ee7ad 752 if (!reason) {
62e76326 753 if (!cbdataReferenceValid(pd->peer))
754 reason = "peer disappeared";
755 else if (size < 0)
756 reason = "swap failure";
757 else if (!fetch->entry)
758 reason = "swap aborted?!";
759 else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))
760 reason = "swap aborted";
e13ee7ad 761 }
62e76326 762
e13ee7ad 763 /* continue checking (maybe-successful eof case) */
764 if (!reason && !size) {
62e76326 765 if (!pd->cd)
766 reason = "null digest?!";
767 else if (fetch->mask_offset != (off_t)pd->cd->mask_size)
768 reason = "premature end of digest?!";
769 else if (!peerDigestUseful(pd))
770 reason = "useless digest";
771 else
772 reason = no_bug = "success";
e13ee7ad 773 }
62e76326 774
e13ee7ad 775 /* finish if we have a reason */
9b7de833 776 if (reason) {
62e76326 777 const int level = strstr(reason, "?!") ? 1 : 3;
778 debug(72, level) ("%s: peer %s, exiting after '%s'\n",
779 step_name, host, reason);
780 peerDigestReqFinish(fetch, buf,
781 1, pdcb_valid, pcb_valid, reason, !no_bug);
e13ee7ad 782 } else {
62e76326 783 /* paranoid check */
784 assert(pdcb_valid && pcb_valid);
e13ee7ad 785 }
62e76326 786
e13ee7ad 787 return reason != NULL;
788}
789
551e60a9 790/* call this when all callback data is valid and fetch must be stopped but
791 * no error has occurred (e.g. we received 304 reply and reuse old digest) */
792static void
793peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason)
794{
795 assert(reason);
796 debug(72, 2) ("peerDigestFetchStop: peer %s, reason: %s\n",
62e76326 797 fetch->pd->host.buf(), reason);
551e60a9 798 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 0);
799}
800
e13ee7ad 801/* call this when all callback data is valid but something bad happened */
802static void
803peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason)
804{
551e60a9 805 assert(reason);
806 debug(72, 2) ("peerDigestFetchAbort: peer %s, reason: %s\n",
62e76326 807 fetch->pd->host.buf(), reason);
e13ee7ad 808 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);
809}
810
811/* complete the digest transfer, update stats, unlock/release everything */
812static void
813peerDigestReqFinish(DigestFetchState * fetch, char *buf,
62e76326 814 int fcb_valid, int pdcb_valid, int pcb_valid,
815 const char *reason, int err)
e13ee7ad 816{
817 assert(reason);
818
819 /* must go before peerDigestPDFinish */
62e76326 820
e13ee7ad 821 if (pdcb_valid) {
62e76326 822 fetch->pd->flags.requested = 0;
823 fetch->pd->req_result = reason;
e13ee7ad 824 }
62e76326 825
e13ee7ad 826 /* schedule next check if peer is still out there */
827 if (pcb_valid) {
62e76326 828 PeerDigest *pd = fetch->pd;
829
830 if (err) {
831 pd->times.retry_delay = peerDigestIncDelay(pd);
832 peerDigestSetCheck(pd, pd->times.retry_delay);
833 } else {
834 pd->times.retry_delay = 0;
835 peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
836 }
9b7de833 837 }
62e76326 838
e13ee7ad 839 /* note: order is significant */
840 if (fcb_valid)
62e76326 841 peerDigestFetchSetStats(fetch);
842
e13ee7ad 843 if (pdcb_valid)
62e76326 844 peerDigestPDFinish(fetch, pcb_valid, err);
845
e13ee7ad 846 if (fcb_valid)
62e76326 847 peerDigestFetchFinish(fetch, err);
9b7de833 848}
849
e13ee7ad 850
851/* destroys digest if peer disappeared
852 * must be called only when fetch and pd cbdata are valid */
9b7de833 853static void
e13ee7ad 854peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err)
9b7de833 855{
e13ee7ad 856 PeerDigest *pd = fetch->pd;
528b2c61 857 const char *host = pd->host.buf();
e13ee7ad 858
859 pd->times.received = squid_curtime;
860 pd->times.req_delay = fetch->resp_time;
8a6218c6 861 kb_incr(&pd->stats.sent.kbytes, (size_t) fetch->sent.bytes);
862 kb_incr(&pd->stats.recv.kbytes, (size_t) fetch->recv.bytes);
e13ee7ad 863 pd->stats.sent.msgs += fetch->sent.msg;
864 pd->stats.recv.msgs += fetch->recv.msg;
865
866 if (err) {
62e76326 867 debug(72, 1) ("%sdisabling (%s) digest from %s\n",
868 pcb_valid ? "temporary " : "",
869 pd->req_result, host);
870
871 if (pd->cd) {
872 cacheDigestDestroy(pd->cd);
873 pd->cd = NULL;
874 }
875
876 pd->flags.usable = 0;
877
878 if (!pcb_valid)
879 peerDigestNotePeerGone(pd);
e13ee7ad 880 } else {
62e76326 881 assert(pcb_valid);
882
883 pd->flags.usable = 1;
e13ee7ad 884
62e76326 885 /* XXX: ugly condition, but how? */
e13ee7ad 886
62e76326 887 if (fetch->entry->store_status == STORE_OK)
888 debug(72, 2) ("re-used old digest from %s\n", host);
889 else
890 debug(72, 2) ("received valid digest from %s\n", host);
d1cdaa16 891 }
62e76326 892
fa80a8ef 893 cbdataReferenceDone(fetch->pd);
e13ee7ad 894}
895
896/* free fetch state structures
897 * must be called only when fetch cbdata is valid */
898static void
899peerDigestFetchFinish(DigestFetchState * fetch, int err)
900{
901 assert(fetch->entry && fetch->request);
902
9b7de833 903 if (fetch->old_entry) {
62e76326 904 debug(72, 2) ("peerDigestFetchFinish: deleting old entry\n");
8121ba82 905 storeUnregister(fetch->old_sc, fetch->old_entry, fetch);
62e76326 906 storeReleaseRequest(fetch->old_entry);
907 storeUnlockObject(fetch->old_entry);
908 fetch->old_entry = NULL;
9b7de833 909 }
62e76326 910
1543ab6c 911 /* update global stats */
83704487 912 kb_incr(&statCounter.cd.kbytes_sent, (size_t) fetch->sent.bytes);
62e76326 913
83704487 914 kb_incr(&statCounter.cd.kbytes_recv, (size_t) fetch->recv.bytes);
62e76326 915
83704487 916 statCounter.cd.msgs_sent += fetch->sent.msg;
62e76326 917
83704487 918 statCounter.cd.msgs_recv += fetch->recv.msg;
e13ee7ad 919
1543ab6c 920 /* unlock everything */
06d2839d 921 storeUnregister(fetch->sc, fetch->entry, fetch);
62e76326 922
9b7de833 923 storeUnlockObject(fetch->entry);
62e76326 924
903c39e4 925 requestUnlink(fetch->request);
62e76326 926
9b7de833 927 fetch->entry = NULL;
62e76326 928
903c39e4 929 fetch->request = NULL;
62e76326 930
3855c318 931 assert(fetch->pd == NULL);
62e76326 932
9b7de833 933 cbdataFree(fetch);
9b7de833 934}
935
e13ee7ad 936/* calculate fetch stats after completion */
937static void
938peerDigestFetchSetStats(DigestFetchState * fetch)
939{
940 MemObject *mem;
941 assert(fetch->entry && fetch->request);
942
943 mem = fetch->entry->mem_obj;
944 assert(mem);
945
946 /* XXX: outgoing numbers are not precise */
947 /* XXX: we must distinguish between 304 hits and misses here */
e11fe29a 948 fetch->sent.bytes = fetch->request->prefixLen();
528b2c61 949 /* XXX: this is slightly wrong: we don't KNOW that the entire memobject
950 * was fetched. We only know how big it is
951 */
952 fetch->recv.bytes = mem->size();
e13ee7ad 953 fetch->sent.msg = fetch->recv.msg = 1;
954 fetch->expires = fetch->entry->expires;
955 fetch->resp_time = squid_curtime - fetch->start_time;
956
957 debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secs\n",
62e76326 958 fetch->recv.bytes, (int) fetch->resp_time);
38650cc8 959 debug(72, 3) ("peerDigestFetchFinish: expires: %ld (%+d), lmt: %ld (%+d)\n",
62e76326 960 (long int) fetch->expires, (int) (fetch->expires - squid_curtime),
961 (long int) fetch->entry->lastmod, (int) (fetch->entry->lastmod - squid_curtime));
e13ee7ad 962}
963
964
9b7de833 965static int
8a6218c6 966peerDigestSetCBlock(PeerDigest * pd, const char *buf)
9b7de833 967{
968 StoreDigestCBlock cblock;
969 int freed_size = 0;
528b2c61 970 const char *host = pd->host.buf();
e13ee7ad 971
9b7de833 972 xmemcpy(&cblock, buf, sizeof(cblock));
973 /* network -> host conversions */
974 cblock.ver.current = ntohs(cblock.ver.current);
975 cblock.ver.required = ntohs(cblock.ver.required);
976 cblock.capacity = ntohl(cblock.capacity);
977 cblock.count = ntohl(cblock.count);
978 cblock.del_count = ntohl(cblock.del_count);
979 cblock.mask_size = ntohl(cblock.mask_size);
4b4cd312 980 debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)\n",
62e76326 981 host, (int) cblock.ver.current, (int) cblock.ver.required);
4b4cd312 982 debug(72, 2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n",
62e76326 983 cblock.mask_size, cblock.count,
984 xpercentInt(cblock.count, cblock.capacity));
6106c6fc 985 /* check version requirements (both ways) */
62e76326 986
9b7de833 987 if (cblock.ver.required > CacheDigestVer.current) {
62e76326 988 debug(72, 1) ("%s digest requires version %d; have: %d\n",
989 host, cblock.ver.required, CacheDigestVer.current);
990 return 0;
9b7de833 991 }
62e76326 992
6106c6fc 993 if (cblock.ver.current < CacheDigestVer.required) {
62e76326 994 debug(72, 1) ("%s digest is version %d; we require: %d\n",
995 host, cblock.ver.current, CacheDigestVer.required);
996 return 0;
6106c6fc 997 }
62e76326 998
9b7de833 999 /* check consistency */
4b4cd312 1000 if (cblock.ver.required > cblock.ver.current ||
62e76326 1001 cblock.mask_size <= 0 || cblock.capacity <= 0 ||
1002 cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
1003 debug(72, 0) ("%s digest cblock is corrupted.\n", host);
1004 return 0;
9b7de833 1005 }
62e76326 1006
d1cdaa16 1007 /* check consistency further */
e6ccf245 1008 if ((size_t)cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
e4049756 1009 debugs(72, 0, host << " digest cblock is corrupted " <<
1010 "(mask size mismatch: " << cblock.mask_size << " ? " <<
1011 cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)
1012 << ").");
62e76326 1013 return 0;
d1cdaa16 1014 }
62e76326 1015
d1cdaa16 1016 /* there are some things we cannot do yet */
1017 if (cblock.hash_func_count != CacheDigestHashFuncCount) {
62e76326 1018 debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.\n",
1019 host, cblock.hash_func_count, CacheDigestHashFuncCount);
1020 return 0;
d1cdaa16 1021 }
62e76326 1022
9b7de833 1023 /*
1024 * no cblock bugs below this point
1025 */
1026 /* check size changes */
e6ccf245 1027 if (pd->cd && cblock.mask_size != (ssize_t)pd->cd->mask_size) {
e4049756 1028 debugs(72, 2, host << " digest changed size: " << cblock.mask_size <<
1029 " -> " << pd->cd->mask_size);
62e76326 1030 freed_size = pd->cd->mask_size;
1031 cacheDigestDestroy(pd->cd);
1032 pd->cd = NULL;
9b7de833 1033 }
62e76326 1034
e13ee7ad 1035 if (!pd->cd) {
62e76326 1036 debug(72, 2) ("creating %s digest; size: %d (%+d) bytes\n",
1037 host, cblock.mask_size, (int) (cblock.mask_size - freed_size));
1038 pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);
1039
1040 if (cblock.mask_size >= freed_size)
1041 kb_incr(&statCounter.cd.memory, cblock.mask_size - freed_size);
9b7de833 1042 }
62e76326 1043
e13ee7ad 1044 assert(pd->cd);
9b7de833 1045 /* these assignments leave us in an inconsistent state until we finish reading the digest */
e13ee7ad 1046 pd->cd->count = cblock.count;
1047 pd->cd->del_count = cblock.del_count;
9b7de833 1048 return 1;
1049}
1050
9b7de833 1051static int
8a6218c6 1052peerDigestUseful(const PeerDigest * pd)
9b7de833 1053{
d1cdaa16 1054 /* TODO: we should calculate the prob of a false hit instead of bit util */
e13ee7ad 1055 const int bit_util = cacheDigestBitUtil(pd->cd);
62e76326 1056
e13ee7ad 1057 if (bit_util > 65) {
62e76326 1058 debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).\n",
1059 pd->host.buf(), bit_util);
1060 return 0;
9b7de833 1061 }
62e76326 1062
9b7de833 1063 return 1;
1064}
7f6eb0fe 1065
e13ee7ad 1066static int
1067saneDiff(time_t diff)
1068{
8a6218c6 1069 return abs(diff) > squid_curtime / 2 ? 0 : diff;
e13ee7ad 1070}
1071
1072void
8a6218c6 1073peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e)
e13ee7ad 1074{
1075#define f2s(flag) (pd->flags.flag ? "yes" : "no")
38650cc8 1076#define appendTime(tm) storeAppendPrintf(e, "%s\t %10ld\t %+d\t %+d\n", \
1077 ""#tm, (long int)pd->times.tm, \
e13ee7ad 1078 saneDiff(pd->times.tm - squid_curtime), \
1079 saneDiff(pd->times.tm - pd->times.initialized))
1080
e13ee7ad 1081 assert(pd);
1082
528b2c61 1083 const char *host = pd->host.buf();
e13ee7ad 1084 storeAppendPrintf(e, "\npeer digest from %s\n", host);
1085
1086 cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
1087
1088 storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");
1089 appendTime(initialized);
1090 appendTime(needed);
1091 appendTime(requested);
1092 appendTime(received);
1093 appendTime(next_check);
1094
1095 storeAppendPrintf(e, "peer digest state:\n");
1096 storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",
62e76326 1097 f2s(needed), f2s(usable), f2s(requested));
e13ee7ad 1098 storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",
62e76326 1099 (int) pd->times.retry_delay);
e13ee7ad 1100 storeAppendPrintf(e, "\tlast request response time: %d secs\n",
62e76326 1101 (int) pd->times.req_delay);
e13ee7ad 1102 storeAppendPrintf(e, "\tlast request result: %s\n",
62e76326 1103 pd->req_result ? pd->req_result : "(none)");
e13ee7ad 1104
1105 storeAppendPrintf(e, "\npeer digest traffic:\n");
1106 storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",
62e76326 1107 pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
e13ee7ad 1108 storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n",
62e76326 1109 pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
e13ee7ad 1110
1111 storeAppendPrintf(e, "\npeer digest structure:\n");
62e76326 1112
e13ee7ad 1113 if (pd->cd)
62e76326 1114 cacheDigestReport(pd->cd, host, e);
e13ee7ad 1115 else
62e76326 1116 storeAppendPrintf(e, "\tno in-memory copy\n");
e13ee7ad 1117}
1118
7f6eb0fe 1119#endif