]> git.ipfire.org Git - thirdparty/squid.git/blame - src/peer_digest.cc
Document the 'carp' cache_peer option
[thirdparty/squid.git] / src / peer_digest.cc
CommitLineData
9b7de833 1
2/*
528b2c61 3 * $Id: peer_digest.cc,v 1.94 2003/01/23 00:37:24 robertc Exp $
9b7de833 4 *
5 * DEBUG: section 72 Peer Digest Routines
6 * AUTHOR: Alex Rousskov
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
e25c139f 9 * ----------------------------------------------------------
9b7de833 10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
9b7de833 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
cbdec147 32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
e25c139f 33 *
9b7de833 34 */
35
36#include "squid.h"
6cfa8966 37#if USE_CACHE_DIGESTS
7a2e5bb5 38
528b2c61 39#include "Store.h"
40#include "HttpRequest.h"
41#include "HttpReply.h"
42#include "MemObject.h"
598465f1 43#include "StoreClient.h"
44
9b7de833 45/* local types */
46
47/* local prototypes */
e13ee7ad 48static time_t peerDigestIncDelay(const PeerDigest * pd);
49static time_t peerDigestNewDelay(const StoreEntry * e);
50static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
eb16313f 51static void peerDigestClean(PeerDigest *);
e13ee7ad 52static EVH peerDigestCheck;
53static void peerDigestRequest(PeerDigest * pd);
add2192d 54static STCB peerDigestHandleReply;
3dfaa3d2 55static int peerDigestFetchReply(void *, char *, ssize_t);
56static int peerDigestSwapInHeaders(void *, char *, ssize_t);
57static int peerDigestSwapInCBlock(void *, char *, ssize_t);
58static int peerDigestSwapInMask(void *, char *, ssize_t);
4b4cd312 59static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
551e60a9 60static void peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason);
e13ee7ad 61static void peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason);
62static void peerDigestReqFinish(DigestFetchState * fetch, char *buf, int, int, int, const char *reason, int err);
63static void peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err);
64static void peerDigestFetchFinish(DigestFetchState * fetch, int err);
65static void peerDigestFetchSetStats(DigestFetchState * fetch);
66static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
67static int peerDigestUseful(const PeerDigest * pd);
395b813e 68
9b7de833 69
70/* local constants */
9d486b43 71
9b7de833 72#define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
73
e13ee7ad 74/* min interval for requesting digests from a given peer */
75static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
76/* min interval for requesting digests (cumulative request stream) */
77static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
bd890734 78
79/* local vars */
9d486b43 80
8a6218c6 81static time_t pd_last_req_time = 0; /* last call to Check */
9b7de833 82
e13ee7ad 83/* initialize peer digest */
84static void
8a6218c6 85peerDigestInit(PeerDigest * pd, peer * p)
9b7de833 86{
e13ee7ad 87 assert(pd && p);
88
89 memset(pd, 0, sizeof(*pd));
90 pd->peer = p;
91 /* if peer disappears, we will know it's name */
528b2c61 92 pd->host = p->host;
e13ee7ad 93
94 pd->times.initialized = squid_curtime;
9b7de833 95}
96
4b4cd312 97static void
8a6218c6 98peerDigestClean(PeerDigest * pd)
9b7de833 99{
e13ee7ad 100 assert(pd);
101 if (pd->cd)
102 cacheDigestDestroy(pd->cd);
528b2c61 103 pd->host.clean();
9b7de833 104}
105
28c60158 106CBDATA_TYPE(PeerDigest);
107
e13ee7ad 108/* allocate new peer digest, call Init, and lock everything */
109PeerDigest *
8a6218c6 110peerDigestCreate(peer * p)
e13ee7ad 111{
112 PeerDigest *pd;
8a6218c6 113 assert(p);
e13ee7ad 114
28c60158 115 CBDATA_INIT_TYPE(PeerDigest);
72711e31 116 pd = cbdataAlloc(PeerDigest);
e13ee7ad 117 peerDigestInit(pd, p);
e13ee7ad 118
fa80a8ef 119 /* XXX This does not look right, and the same thing again in the caller */
120 return cbdataReference(pd);
e13ee7ad 121}
122
123/* call Clean and free/unlock everything */
2d72d4fd 124static void
8a6218c6 125peerDigestDestroy(PeerDigest * pd)
e13ee7ad 126{
fcbbccfe 127 peer *p;
e13ee7ad 128 assert(pd);
e13ee7ad 129
fa80a8ef 130 /* inform peer (if any) that we are gone */
bac6d4bd 131 if (cbdataReferenceValidDone(pd->peer, (void **) &p))
fcbbccfe 132 peerNoteDigestGone(p);
e13ee7ad 133
134 peerDigestClean(pd);
135 cbdataFree(pd);
136}
137
138/* called by peer to indicate that somebody actually needs this digest */
139void
8a6218c6 140peerDigestNeeded(PeerDigest * pd)
e13ee7ad 141{
142 assert(pd);
143 assert(!pd->flags.needed);
144 assert(!pd->cd);
145
146 pd->flags.needed = 1;
147 pd->times.needed = squid_curtime;
8a6218c6 148 peerDigestSetCheck(pd, 0); /* check asap */
e13ee7ad 149}
150
151/* currently we do not have a reason to disable without destroying */
8a6218c6 152#if FUTURE_CODE
395b813e 153/* disables peer for good */
154static void
8a6218c6 155peerDigestDisable(PeerDigest * pd)
395b813e 156{
8a6218c6 157 debug(72, 2) ("peerDigestDisable: peer %s disabled for good\n",
528b2c61 158 pd->host.buf());
e13ee7ad 159 pd->times.disabled = squid_curtime;
8a6218c6 160 pd->times.next_check = -1; /* never */
e13ee7ad 161 pd->flags.usable = 0;
162
163 if (pd->cd) {
164 cacheDigestDestroy(pd->cd);
165 pd->cd = NULL;
166 }
167 /* we do not destroy the pd itself to preserve its "history" and stats */
395b813e 168}
e13ee7ad 169#endif
395b813e 170
e13ee7ad 171/* increment retry delay [after an unsuccessful attempt] */
395b813e 172static time_t
8a6218c6 173peerDigestIncDelay(const PeerDigest * pd)
395b813e 174{
e13ee7ad 175 assert(pd);
176 return pd->times.retry_delay > 0 ?
8a6218c6 177 2 * pd->times.retry_delay : /* exponential backoff */
178 PeerDigestReqMinGap; /* minimal delay */
395b813e 179}
180
e13ee7ad 181/* artificially increases Expires: setting to avoid race conditions
182 * returns the delay till that [increased] expiration time */
00485c29 183static time_t
e13ee7ad 184peerDigestNewDelay(const StoreEntry * e)
00485c29 185{
e13ee7ad 186 assert(e);
00485c29 187 if (e->expires > 0)
e13ee7ad 188 return e->expires + PeerDigestReqMinGap - squid_curtime;
189 return PeerDigestReqMinGap;
00485c29 190}
191
e13ee7ad 192/* registers next digest verification */
395b813e 193static void
e13ee7ad 194peerDigestSetCheck(PeerDigest * pd, time_t delay)
395b813e 195{
e13ee7ad 196 eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
197 pd->times.next_check = squid_curtime + delay;
198 debug(72, 3) ("peerDigestSetCheck: will check peer %s in %d secs\n",
528b2c61 199 pd->host.buf(), (int) delay);
e13ee7ad 200}
201
5385c86a 202/*
203 * called when peer is about to disappear or have already disappeared
204 */
e13ee7ad 205void
8a6218c6 206peerDigestNotePeerGone(PeerDigest * pd)
207{
e13ee7ad 208 if (pd->flags.requested) {
5385c86a 209 debug(72, 2) ("peerDigest: peer %s gone, will destroy after fetch.\n",
528b2c61 210 pd->host.buf());
e13ee7ad 211 /* do nothing now, the fetching chain will notice and take action */
395b813e 212 } else {
5385c86a 213 debug(72, 2) ("peerDigest: peer %s is gone, destroying now.\n",
528b2c61 214 pd->host.buf());
e13ee7ad 215 peerDigestDestroy(pd);
395b813e 216 }
217}
218
e13ee7ad 219/* callback for eventAdd() (with peer digest locked)
220 * request new digest if our copy is too old or if we lack one;
221 * schedule next check otherwise */
9b7de833 222static void
e13ee7ad 223peerDigestCheck(void *data)
9b7de833 224{
e6ccf245 225 PeerDigest *pd = (PeerDigest *)data;
e13ee7ad 226 time_t req_time;
227
e13ee7ad 228 assert(!pd->flags.requested);
5d9bb360 229
8a6218c6 230 pd->times.next_check = 0; /* unknown */
e13ee7ad 231
fa80a8ef 232 if (!cbdataReferenceValid(pd->peer)) {
e13ee7ad 233 peerDigestNotePeerGone(pd);
234 return;
9b7de833 235 }
e13ee7ad 236 debug(72, 3) ("peerDigestCheck: peer %s:%d\n", pd->peer->host, pd->peer->http_port);
38650cc8 237 debug(72, 3) ("peerDigestCheck: time: %ld, last received: %ld (%+d)\n",
84f2d773 238 (long int) squid_curtime, (long int) pd->times.received, (int) (squid_curtime - pd->times.received));
e13ee7ad 239
240 /* decide when we should send the request:
241 * request now unless too close to other requests */
242 req_time = squid_curtime;
243
244 /* per-peer limit */
245 if (req_time - pd->times.received < PeerDigestReqMinGap) {
246 debug(72, 2) ("peerDigestCheck: %s, avoiding close peer requests (%d < %d secs).\n",
528b2c61 247 pd->host.buf(), (int) (req_time - pd->times.received),
84f2d773 248 (int) PeerDigestReqMinGap);
e13ee7ad 249 req_time = pd->times.received + PeerDigestReqMinGap;
bd890734 250 }
e13ee7ad 251 /* global limit */
252 if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
253 debug(72, 2) ("peerDigestCheck: %s, avoiding close requests (%d < %d secs).\n",
528b2c61 254 pd->host.buf(), (int) (req_time - pd_last_req_time),
84f2d773 255 (int) GlobDigestReqMinGap);
e13ee7ad 256 req_time = pd_last_req_time + GlobDigestReqMinGap;
9b7de833 257 }
e13ee7ad 258 if (req_time <= squid_curtime)
8a6218c6 259 peerDigestRequest(pd); /* will set pd->flags.requested */
e13ee7ad 260 else
261 peerDigestSetCheck(pd, req_time - squid_curtime);
9b7de833 262}
263
2812146b 264CBDATA_TYPE(DigestFetchState);
265
e13ee7ad 266/* ask store for a digest */
9b7de833 267static void
e13ee7ad 268peerDigestRequest(PeerDigest * pd)
9b7de833 269{
e13ee7ad 270 peer *p = pd->peer;
9b7de833 271 StoreEntry *e, *old_e;
272 char *url;
9d486b43 273 const cache_key *key;
9b7de833 274 request_t *req;
275 DigestFetchState *fetch = NULL;
528b2c61 276 StoreIOBuffer tempBuffer;
e13ee7ad 277
278 pd->req_result = NULL;
279 pd->flags.requested = 1;
280
9b7de833 281 /* compute future request components */
7e3ce7b9 282 if (p->digest_url)
283 url = xstrdup(p->digest_url);
284 else
285 url = internalRemoteUri(p->host, p->http_port,
286 "/squid-internal-periodic/", StoreDigestFileName);
287
50a97cee 288 req = urlParse(METHOD_GET, url);
e13ee7ad 289 assert(req);
f66a9ef4 290 key = storeKeyPublicByRequest(req);
291 debug(72, 2) ("peerDigestRequest: %s key: %s\n", url, storeKeyText(key));
e13ee7ad 292
9b7de833 293 /* add custom headers */
2246b732 294 assert(!req->header.len);
4820f62b 295 httpHeaderPutStr(&req->header, HDR_ACCEPT, StoreDigestMimeStr);
296 httpHeaderPutStr(&req->header, HDR_ACCEPT, "text/html");
9bc73deb 297 if (p->login)
298 xstrncpy(req->login, p->login, MAX_LOGIN_SZ);
9b7de833 299 /* create fetch state structure */
2812146b 300 CBDATA_INIT_TYPE(DigestFetchState);
72711e31 301 fetch = cbdataAlloc(DigestFetchState);
903c39e4 302 fetch->request = requestLink(req);
fa80a8ef 303 fetch->pd = cbdataReference(pd);
e13ee7ad 304 fetch->offset = 0;
add2192d 305 fetch->state = DIGEST_READ_REPLY;
e13ee7ad 306
307 /* update timestamps */
9b7de833 308 fetch->start_time = squid_curtime;
e13ee7ad 309 pd->times.requested = squid_curtime;
310 pd_last_req_time = squid_curtime;
311
92695e5e 312 req->flags.cachable = 1;
9b7de833 313 /* the rest is based on clientProcessExpired() */
92695e5e 314 req->flags.refresh = 1;
9d486b43 315 old_e = fetch->old_entry = storeGet(key);
9b7de833 316 if (old_e) {
4b4cd312 317 debug(72, 5) ("peerDigestRequest: found old entry\n");
9b7de833 318 storeLockObject(old_e);
319 storeCreateMemObject(old_e, url, url);
06d2839d 320 fetch->old_sc = storeClientListAdd(old_e, fetch);
9b7de833 321 }
322 e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
e13ee7ad 323 assert(EBIT_TEST(e->flags, KEY_PRIVATE));
06d2839d 324 fetch->sc = storeClientListAdd(e, fetch);
9b7de833 325 /* set lastmod to trigger IMS request if possible */
326 if (old_e)
327 e->lastmod = old_e->lastmod;
e13ee7ad 328
9b7de833 329 /* push towards peer cache */
e13ee7ad 330 debug(72, 3) ("peerDigestRequest: forwarding to fwdStart...\n");
7e3ce7b9 331 fwdStart(-1, e, req);
598465f1 332 tempBuffer.offset = 0;
f95db407 333 tempBuffer.length = SM_PAGE_SIZE;
598465f1 334 tempBuffer.data = fetch->buf;
335 storeClientCopy(fetch->sc, e, tempBuffer,
add2192d 336 peerDigestHandleReply, fetch);
9b7de833 337}
338
add2192d 339
340/* Handle the data copying .. */
341
342/*
343 * This routine handles the copy data and then redirects the
344 * copy to a bunch of subfunctions depending upon the copy state.
345 * It also tracks the buffer offset and "seen", since I'm actually
346 * not interested in rewriting everything to suit my little idea.
347 */
9b7de833 348static void
598465f1 349peerDigestHandleReply(void *data, StoreIOBuffer recievedData)
add2192d 350{
e6ccf245 351 DigestFetchState *fetch = (DigestFetchState *)data;
add2192d 352 PeerDigest *pd = fetch->pd;
353 int retsize = -1;
354 digest_read_state_t prevstate;
355 int newsize;
356
598465f1 357 assert(pd && recievedData.data);
358 /* The existing code assumes that the recieved pointer is
359 * where we asked the data to be put
360 */
361 assert(fetch->buf + fetch->bufofs == recievedData.data);
add2192d 362
363 /* Update the buffer size */
598465f1 364 fetch->bufofs += recievedData.length;
add2192d 365
366 assert(fetch->bufofs <= SM_PAGE_SIZE);
367
368 /* If we've fetched enough, return */
369 if (peerDigestFetchedEnough(fetch, fetch->buf, fetch->bufofs, "peerDigestHandleReply"))
370 return;
371
372 /* Call the right function based on the state */
373 /* (Those functions will update the state if needed) */
fa80a8ef 374
e2848e94 375 /* Give us a temporary reference. Some of the calls we make may
376 * try to destroy the fetch structure, and we like to know if they
377 * do
bac6d4bd 378 */
e2848e94 379 fetch = cbdataReference(fetch);
add2192d 380
381 /* Repeat this loop until we're out of data OR the state changes */
382 /* (So keep going if the state has changed and we still have data */
383 do {
fa80a8ef 384 prevstate = fetch->state;
385 switch (fetch->state) {
386 case DIGEST_READ_REPLY:
e6ccf245 387 retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs);
fa80a8ef 388 break;
389 case DIGEST_READ_HEADERS:
e6ccf245 390 retsize = peerDigestSwapInHeaders(fetch, fetch->buf, fetch->bufofs);
fa80a8ef 391 break;
392 case DIGEST_READ_CBLOCK:
e6ccf245 393 retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs);
fa80a8ef 394 break;
395 case DIGEST_READ_MASK:
e6ccf245 396 retsize = peerDigestSwapInMask(fetch, fetch->buf, fetch->bufofs);
fa80a8ef 397 break;
398 case DIGEST_READ_NONE:
399 break;
400 case DIGEST_READ_DONE:
401 goto finish;
402 break;
403 default:
404 fatal("Bad digest transfer mode!\n");
405 }
406
407 if (retsize < 0)
408 goto finish;
409 /*
410 * The returned size indicates how much of the buffer was read -
411 * so move the remainder of the buffer to the beginning
412 * and update the bufofs / bufsize
413 */
414 newsize = fetch->bufofs - retsize;
415 xmemmove(fetch->buf, fetch->buf + retsize, fetch->bufofs - newsize);
416 fetch->bufofs = newsize;
add2192d 417
e2848e94 418 } while (cbdataReferenceValid(fetch) && prevstate != fetch->state && fetch->bufofs > 0);
add2192d 419
420 /* Update the copy offset */
598465f1 421 fetch->offset += recievedData.length;
add2192d 422
423 /* Schedule another copy */
fa80a8ef 424 if (cbdataReferenceValid(fetch)) {
528b2c61 425 StoreIOBuffer tempBuffer;
598465f1 426 tempBuffer.offset = fetch->offset;
427 tempBuffer.length = SM_PAGE_SIZE - fetch->bufofs;
428 tempBuffer.data = fetch->buf + fetch->bufofs;
429 storeClientCopy(fetch->sc, fetch->entry, tempBuffer,
430 peerDigestHandleReply, fetch);
add2192d 431 }
fa80a8ef 432 finish:
e2848e94 433 /* Get rid of our reference, we've finished with it for now */
434 cbdataReferenceDone(fetch);
add2192d 435}
436
437
438
439/* wait for full http headers to be received then parse them */
440/*
441 * This routine handles parsing the reply line.
442 * If the reply line indicates an OK, the same data is thrown
443 * to SwapInHeaders(). If the reply line is a NOT_MODIFIED,
444 * we simply stop parsing.
445 */
446static int
9b7de833 447peerDigestFetchReply(void *data, char *buf, ssize_t size)
448{
e6ccf245 449 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 450 PeerDigest *pd = fetch->pd;
9bc73deb 451 size_t hdr_size;
e13ee7ad 452 assert(pd && buf);
9b7de833 453 assert(!fetch->offset);
e13ee7ad 454
add2192d 455 assert(fetch->state == DIGEST_READ_REPLY);
9b7de833 456 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
add2192d 457 return -1;
e13ee7ad 458
9bc73deb 459 if ((hdr_size = headersEnd(buf, size))) {
9b7de833 460 http_status status;
528b2c61 461 HttpReply const *reply = fetch->entry->getReply();
9b7de833 462 assert(reply);
528b2c61 463 assert (reply->sline.status != 0);
9b7de833 464 status = reply->sline.status;
38650cc8 465 debug(72, 3) ("peerDigestFetchReply: %s status: %d, expires: %ld (%+d)\n",
528b2c61 466 pd->host.buf(), status,
84f2d773 467 (long int) reply->expires, (int) (reply->expires - squid_curtime));
e13ee7ad 468
9b7de833 469 /* this "if" is based on clientHandleIMSReply() */
470 if (status == HTTP_NOT_MODIFIED) {
471 request_t *r = NULL;
472 /* our old entry is fine */
473 assert(fetch->old_entry);
474 if (!fetch->old_entry->mem_obj->request)
475 fetch->old_entry->mem_obj->request = r =
2920225f 476 requestLink(fetch->entry->mem_obj->request);
477 assert(fetch->old_entry->mem_obj->request);
528b2c61 478 httpReplyUpdateOnNotModified((HttpReply *)fetch->old_entry->getReply(), reply);
9b7de833 479 storeTimestampsSet(fetch->old_entry);
480 /* get rid of 304 reply */
06d2839d 481 storeUnregister(fetch->sc, fetch->entry, fetch);
9b7de833 482 storeUnlockObject(fetch->entry);
483 fetch->entry = fetch->old_entry;
484 fetch->old_entry = NULL;
2920225f 485 /* preserve request -- we need its size to update counters */
486 /* requestUnlink(r); */
487 /* fetch->entry->mem_obj->request = NULL; */
4b4cd312 488 } else if (status == HTTP_OK) {
9b7de833 489 /* get rid of old entry if any */
490 if (fetch->old_entry) {
e13ee7ad 491 debug(72, 3) ("peerDigestFetchReply: got new digest, releasing old one\n");
06d2839d 492 storeUnregister(fetch->old_sc, fetch->old_entry, fetch);
9b7de833 493 storeReleaseRequest(fetch->old_entry);
494 storeUnlockObject(fetch->old_entry);
495 fetch->old_entry = NULL;
496 }
497 } else {
498 /* some kind of a bug */
e13ee7ad 499 peerDigestFetchAbort(fetch, buf, httpStatusLineReason(&reply->sline));
fa80a8ef 500 return -1; /* XXX -1 will abort stuff in ReadReply! */
9b7de833 501 }
502 /* must have a ready-to-use store entry if we got here */
e13ee7ad 503 /* can we stay with the old in-memory digest? */
add2192d 504 if (status == HTTP_NOT_MODIFIED && fetch->pd->cd) {
551e60a9 505 peerDigestFetchStop(fetch, buf, "Not modified");
add2192d 506 fetch->state = DIGEST_READ_DONE;
507 } else {
508 fetch->state = DIGEST_READ_HEADERS;
fa80a8ef 509 }
9b7de833 510 } else {
511 /* need more data, do we have space? */
512 if (size >= SM_PAGE_SIZE)
e13ee7ad 513 peerDigestFetchAbort(fetch, buf, "reply header too big");
9b7de833 514 }
add2192d 515
516 /* We don't want to actually ack that we've handled anything,
517 * otherwise SwapInHeaders() won't get the reply line .. */
518 return 0;
9b7de833 519}
520
521/* fetch headers from disk, pass on to SwapInCBlock */
add2192d 522static int
9b7de833 523peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
524{
e6ccf245 525 DigestFetchState *fetch = (DigestFetchState *)data;
9b7de833 526 size_t hdr_size;
e13ee7ad 527
add2192d 528 assert(fetch->state == DIGEST_READ_HEADERS);
9b7de833 529 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
add2192d 530 return -1;
e13ee7ad 531
9d486b43 532 assert(!fetch->offset);
9b7de833 533 if ((hdr_size = headersEnd(buf, size))) {
528b2c61 534 assert(fetch->entry->getReply());
535 assert (fetch->entry->getReply()->sline.status != 0);
536 if (fetch->entry->getReply()->sline.status != HTTP_OK) {
dba2bbcd 537 debug(72, 1) ("peerDigestSwapInHeaders: %s status %d got cached!\n",
528b2c61 538 fetch->pd->host.buf(), fetch->entry->getReply()->sline.status);
e13ee7ad 539 peerDigestFetchAbort(fetch, buf, "internal status error");
add2192d 540 return -1;
dba2bbcd 541 }
add2192d 542 fetch->state = DIGEST_READ_CBLOCK;
fa80a8ef 543 return hdr_size; /* Say how much data we read */
9b7de833 544 } else {
545 /* need more data, do we have space? */
add2192d 546 if (size >= SM_PAGE_SIZE) {
e13ee7ad 547 peerDigestFetchAbort(fetch, buf, "stored header too big");
add2192d 548 return -1;
549 } else {
fa80a8ef 550 return 0; /* We need to read more to parse .. */
551 }
9b7de833 552 }
add2192d 553 fatal("peerDigestSwapInHeaders() - shouldn't get here!\n");
9b7de833 554}
555
add2192d 556static int
9b7de833 557peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
558{
e6ccf245 559 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 560
add2192d 561 assert(fetch->state == DIGEST_READ_CBLOCK);
9b7de833 562 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
add2192d 563 return -1;
e13ee7ad 564
e6ccf245 565 if (size >= (ssize_t)StoreDigestCBlockSize) {
e13ee7ad 566 PeerDigest *pd = fetch->pd;
528b2c61 567 HttpReply const *rep = fetch->entry->getReply();
9d486b43 568
e13ee7ad 569 assert(pd && rep);
570 if (peerDigestSetCBlock(pd, buf)) {
571 /* XXX: soon we will have variable header size */
e13ee7ad 572 /* switch to CD buffer and fetch digest guts */
da407def 573 buf = NULL;
e13ee7ad 574 assert(pd->cd->mask);
fa80a8ef 575 fetch->state = DIGEST_READ_MASK;
576 return StoreDigestCBlockSize;
9b7de833 577 } else {
e13ee7ad 578 peerDigestFetchAbort(fetch, buf, "invalid digest cblock");
fa80a8ef 579 return -1;
9b7de833 580 }
581 } else {
582 /* need more data, do we have space? */
add2192d 583 if (size >= SM_PAGE_SIZE) {
e13ee7ad 584 peerDigestFetchAbort(fetch, buf, "digest cblock too big");
fa80a8ef 585 return -1;
586 } else {
587 return 0; /* We need more data */
588 }
9b7de833 589 }
add2192d 590 fatal("peerDigestSwapInCBlock(): shouldn't get here!\n");
9b7de833 591}
592
add2192d 593static int
9b7de833 594peerDigestSwapInMask(void *data, char *buf, ssize_t size)
595{
e6ccf245 596 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 597 PeerDigest *pd;
598
e13ee7ad 599 pd = fetch->pd;
600 assert(pd->cd && pd->cd->mask);
9d486b43 601
add2192d 602 /*
603 * NOTENOTENOTENOTENOTE: buf doesn't point to pd->cd->mask anymore!
604 * we need to do the copy ourselves!
605 */
606 xmemcpy(pd->cd->mask + fetch->mask_offset, buf, size);
607
608 /* NOTE! buf points to the middle of pd->cd->mask! */
609 if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask"))
610 return -1;
611
da407def 612 fetch->mask_offset += size;
e6ccf245 613 if (fetch->mask_offset >= (off_t)pd->cd->mask_size) {
68814ffd 614 debug(72, 2) ("peerDigestSwapInMask: Done! Got %d, expected %d\n",
e13ee7ad 615 fetch->mask_offset, pd->cd->mask_size);
e6ccf245 616 assert(fetch->mask_offset == (off_t)pd->cd->mask_size);
e13ee7ad 617 assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask"));
fa80a8ef 618 return -1; /* XXX! */
e13ee7ad 619 } else {
fa80a8ef 620 /* We always read everything, so return so */
621 return size;
9b7de833 622 }
add2192d 623 fatal("peerDigestSwapInMask(): shouldn't get here!\n");
9b7de833 624}
625
626static int
4b4cd312 627peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name)
9b7de833 628{
e13ee7ad 629 PeerDigest *pd = NULL;
8a6218c6 630 const char *host = "<unknown>"; /* peer host */
631 const char *reason = NULL; /* reason for completion */
632 const char *no_bug = NULL; /* successful completion if set */
e2848e94 633 const int pdcb_valid = cbdataReferenceValid(fetch->pd);
634 const int pcb_valid = cbdataReferenceValid(fetch->pd->peer);
e13ee7ad 635
636 /* test possible exiting conditions (the same for most steps!)
637 * cases marked with '?!' should not happen */
638
639 if (!reason) {
e2848e94 640 if (!(pd = fetch->pd))
e13ee7ad 641 reason = "peer digest disappeared?!";
bac6d4bd 642#if DONT /* WHY NOT? /HNO */
fa80a8ef 643 else if (!cbdataReferenceValid(pd))
e13ee7ad 644 reason = "invalidated peer digest?!";
5385c86a 645#endif
e13ee7ad 646 else
528b2c61 647 host = pd->host.buf();
e13ee7ad 648 }
e13ee7ad 649 debug(72, 6) ("%s: peer %s, offset: %d size: %d.\n",
bac6d4bd 650 step_name, host,
e2848e94 651 fetch->offset, size);
e13ee7ad 652
653 /* continue checking (with pd and host known and valid) */
654 if (!reason) {
fa80a8ef 655 if (!cbdataReferenceValid(pd->peer))
e13ee7ad 656 reason = "peer disappeared";
657 else if (size < 0)
658 reason = "swap failure";
659 else if (!fetch->entry)
660 reason = "swap aborted?!";
b7fe0ab0 661 else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))
e13ee7ad 662 reason = "swap aborted";
663 }
e13ee7ad 664 /* continue checking (maybe-successful eof case) */
665 if (!reason && !size) {
666 if (!pd->cd)
667 reason = "null digest?!";
e6ccf245 668 else if (fetch->mask_offset != (off_t)pd->cd->mask_size)
e13ee7ad 669 reason = "premature end of digest?!";
670 else if (!peerDigestUseful(pd))
671 reason = "useless digest";
672 else
673 reason = no_bug = "success";
674 }
e13ee7ad 675 /* finish if we have a reason */
9b7de833 676 if (reason) {
e13ee7ad 677 const int level = strstr(reason, "?!") ? 1 : 3;
678 debug(72, level) ("%s: peer %s, exiting after '%s'\n",
679 step_name, host, reason);
680 peerDigestReqFinish(fetch, buf,
e2848e94 681 1, pdcb_valid, pcb_valid, reason, !no_bug);
e13ee7ad 682 } else {
683 /* paranoid check */
e2848e94 684 assert(pdcb_valid && pcb_valid);
e13ee7ad 685 }
686 return reason != NULL;
687}
688
551e60a9 689/* call this when all callback data is valid and fetch must be stopped but
690 * no error has occurred (e.g. we received 304 reply and reuse old digest) */
691static void
692peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason)
693{
694 assert(reason);
695 debug(72, 2) ("peerDigestFetchStop: peer %s, reason: %s\n",
528b2c61 696 fetch->pd->host.buf(), reason);
551e60a9 697 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 0);
698}
699
e13ee7ad 700/* call this when all callback data is valid but something bad happened */
701static void
702peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason)
703{
551e60a9 704 assert(reason);
705 debug(72, 2) ("peerDigestFetchAbort: peer %s, reason: %s\n",
528b2c61 706 fetch->pd->host.buf(), reason);
e13ee7ad 707 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);
708}
709
710/* complete the digest transfer, update stats, unlock/release everything */
711static void
712peerDigestReqFinish(DigestFetchState * fetch, char *buf,
8a6218c6 713 int fcb_valid, int pdcb_valid, int pcb_valid,
e13ee7ad 714 const char *reason, int err)
715{
716 assert(reason);
717
718 /* must go before peerDigestPDFinish */
719 if (pdcb_valid) {
720 fetch->pd->flags.requested = 0;
721 fetch->pd->req_result = reason;
722 }
e13ee7ad 723 /* schedule next check if peer is still out there */
724 if (pcb_valid) {
725 PeerDigest *pd = fetch->pd;
726 if (err) {
727 pd->times.retry_delay = peerDigestIncDelay(pd);
728 peerDigestSetCheck(pd, pd->times.retry_delay);
9d486b43 729 } else {
e13ee7ad 730 pd->times.retry_delay = 0;
731 peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
9d486b43 732 }
9b7de833 733 }
e13ee7ad 734 /* note: order is significant */
735 if (fcb_valid)
736 peerDigestFetchSetStats(fetch);
737 if (pdcb_valid)
738 peerDigestPDFinish(fetch, pcb_valid, err);
739 if (fcb_valid)
740 peerDigestFetchFinish(fetch, err);
9b7de833 741}
742
e13ee7ad 743
744/* destroys digest if peer disappeared
745 * must be called only when fetch and pd cbdata are valid */
9b7de833 746static void
e13ee7ad 747peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err)
9b7de833 748{
e13ee7ad 749 PeerDigest *pd = fetch->pd;
528b2c61 750 const char *host = pd->host.buf();
e13ee7ad 751
752 pd->times.received = squid_curtime;
753 pd->times.req_delay = fetch->resp_time;
8a6218c6 754 kb_incr(&pd->stats.sent.kbytes, (size_t) fetch->sent.bytes);
755 kb_incr(&pd->stats.recv.kbytes, (size_t) fetch->recv.bytes);
e13ee7ad 756 pd->stats.sent.msgs += fetch->sent.msg;
757 pd->stats.recv.msgs += fetch->recv.msg;
758
759 if (err) {
760 debug(72, 1) ("%sdisabling (%s) digest from %s\n",
761 pcb_valid ? "temporary " : "",
762 pd->req_result, host);
763
764 if (pd->cd) {
765 cacheDigestDestroy(pd->cd);
766 pd->cd = NULL;
767 }
e13ee7ad 768 pd->flags.usable = 0;
769
770 if (!pcb_valid)
771 peerDigestNotePeerGone(pd);
772 } else {
773 assert(pcb_valid);
774
775 pd->flags.usable = 1;
776
777 /* XXX: ugly condition, but how? */
778 if (fetch->entry->store_status == STORE_OK)
779 debug(72, 2) ("re-used old digest from %s\n", host);
780 else
781 debug(72, 2) ("received valid digest from %s\n", host);
d1cdaa16 782 }
fa80a8ef 783 cbdataReferenceDone(fetch->pd);
e13ee7ad 784}
785
786/* free fetch state structures
787 * must be called only when fetch cbdata is valid */
788static void
789peerDigestFetchFinish(DigestFetchState * fetch, int err)
790{
791 assert(fetch->entry && fetch->request);
792
9b7de833 793 if (fetch->old_entry) {
4b4cd312 794 debug(72, 2) ("peerDigestFetchFinish: deleting old entry\n");
06d2839d 795 storeUnregister(fetch->sc, fetch->old_entry, fetch);
9b7de833 796 storeReleaseRequest(fetch->old_entry);
797 storeUnlockObject(fetch->old_entry);
798 fetch->old_entry = NULL;
799 }
1543ab6c 800 /* update global stats */
83704487 801 kb_incr(&statCounter.cd.kbytes_sent, (size_t) fetch->sent.bytes);
802 kb_incr(&statCounter.cd.kbytes_recv, (size_t) fetch->recv.bytes);
803 statCounter.cd.msgs_sent += fetch->sent.msg;
804 statCounter.cd.msgs_recv += fetch->recv.msg;
e13ee7ad 805
1543ab6c 806 /* unlock everything */
06d2839d 807 storeUnregister(fetch->sc, fetch->entry, fetch);
9b7de833 808 storeUnlockObject(fetch->entry);
903c39e4 809 requestUnlink(fetch->request);
9b7de833 810 fetch->entry = NULL;
903c39e4 811 fetch->request = NULL;
3855c318 812 assert(fetch->pd == NULL);
9b7de833 813 cbdataFree(fetch);
9b7de833 814}
815
e13ee7ad 816/* calculate fetch stats after completion */
817static void
818peerDigestFetchSetStats(DigestFetchState * fetch)
819{
820 MemObject *mem;
821 assert(fetch->entry && fetch->request);
822
823 mem = fetch->entry->mem_obj;
824 assert(mem);
825
826 /* XXX: outgoing numbers are not precise */
827 /* XXX: we must distinguish between 304 hits and misses here */
828 fetch->sent.bytes = httpRequestPrefixLen(fetch->request);
528b2c61 829 /* XXX: this is slightly wrong: we don't KNOW that the entire memobject
830 * was fetched. We only know how big it is
831 */
832 fetch->recv.bytes = mem->size();
e13ee7ad 833 fetch->sent.msg = fetch->recv.msg = 1;
834 fetch->expires = fetch->entry->expires;
835 fetch->resp_time = squid_curtime - fetch->start_time;
836
837 debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secs\n",
84f2d773 838 fetch->recv.bytes, (int) fetch->resp_time);
38650cc8 839 debug(72, 3) ("peerDigestFetchFinish: expires: %ld (%+d), lmt: %ld (%+d)\n",
84f2d773 840 (long int) fetch->expires, (int) (fetch->expires - squid_curtime),
841 (long int) fetch->entry->lastmod, (int) (fetch->entry->lastmod - squid_curtime));
e13ee7ad 842}
843
844
9b7de833 845static int
8a6218c6 846peerDigestSetCBlock(PeerDigest * pd, const char *buf)
9b7de833 847{
848 StoreDigestCBlock cblock;
849 int freed_size = 0;
528b2c61 850 const char *host = pd->host.buf();
e13ee7ad 851
9b7de833 852 xmemcpy(&cblock, buf, sizeof(cblock));
853 /* network -> host conversions */
854 cblock.ver.current = ntohs(cblock.ver.current);
855 cblock.ver.required = ntohs(cblock.ver.required);
856 cblock.capacity = ntohl(cblock.capacity);
857 cblock.count = ntohl(cblock.count);
858 cblock.del_count = ntohl(cblock.del_count);
859 cblock.mask_size = ntohl(cblock.mask_size);
4b4cd312 860 debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)\n",
e13ee7ad 861 host, (int) cblock.ver.current, (int) cblock.ver.required);
4b4cd312 862 debug(72, 2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n",
9b7de833 863 cblock.mask_size, cblock.count,
864 xpercentInt(cblock.count, cblock.capacity));
6106c6fc 865 /* check version requirements (both ways) */
9b7de833 866 if (cblock.ver.required > CacheDigestVer.current) {
4b4cd312 867 debug(72, 1) ("%s digest requires version %d; have: %d\n",
e13ee7ad 868 host, cblock.ver.required, CacheDigestVer.current);
9b7de833 869 return 0;
870 }
6106c6fc 871 if (cblock.ver.current < CacheDigestVer.required) {
4b4cd312 872 debug(72, 1) ("%s digest is version %d; we require: %d\n",
e13ee7ad 873 host, cblock.ver.current, CacheDigestVer.required);
6106c6fc 874 return 0;
875 }
9b7de833 876 /* check consistency */
4b4cd312 877 if (cblock.ver.required > cblock.ver.current ||
d1cdaa16 878 cblock.mask_size <= 0 || cblock.capacity <= 0 ||
879 cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
e13ee7ad 880 debug(72, 0) ("%s digest cblock is corrupted.\n", host);
9b7de833 881 return 0;
882 }
d1cdaa16 883 /* check consistency further */
e6ccf245 884 if ((size_t)cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
4b4cd312 885 debug(72, 0) ("%s digest cblock is corrupted (mask size mismatch: %d ? %d).\n",
e13ee7ad 886 host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry));
d1cdaa16 887 return 0;
888 }
889 /* there are some things we cannot do yet */
890 if (cblock.hash_func_count != CacheDigestHashFuncCount) {
4b4cd312 891 debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.\n",
e13ee7ad 892 host, cblock.hash_func_count, CacheDigestHashFuncCount);
d1cdaa16 893 return 0;
894 }
9b7de833 895 /*
896 * no cblock bugs below this point
897 */
898 /* check size changes */
e6ccf245 899 if (pd->cd && cblock.mask_size != (ssize_t)pd->cd->mask_size) {
4b4cd312 900 debug(72, 2) ("%s digest changed size: %d -> %d\n",
8a6218c6 901 host, cblock.mask_size, pd->cd->mask_size);
e13ee7ad 902 freed_size = pd->cd->mask_size;
903 cacheDigestDestroy(pd->cd);
904 pd->cd = NULL;
9b7de833 905 }
e13ee7ad 906 if (!pd->cd) {
45694534 907 debug(72, 2) ("creating %s digest; size: %d (%+d) bytes\n",
e13ee7ad 908 host, cblock.mask_size, (int) (cblock.mask_size - freed_size));
909 pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);
9b7de833 910 if (cblock.mask_size >= freed_size)
83704487 911 kb_incr(&statCounter.cd.memory, cblock.mask_size - freed_size);
9b7de833 912 }
e13ee7ad 913 assert(pd->cd);
9b7de833 914 /* these assignments leave us in an inconsistent state until we finish reading the digest */
e13ee7ad 915 pd->cd->count = cblock.count;
916 pd->cd->del_count = cblock.del_count;
9b7de833 917 return 1;
918}
919
9b7de833 920static int
8a6218c6 921peerDigestUseful(const PeerDigest * pd)
9b7de833 922{
d1cdaa16 923 /* TODO: we should calculate the prob of a false hit instead of bit util */
e13ee7ad 924 const int bit_util = cacheDigestBitUtil(pd->cd);
925 if (bit_util > 65) {
4b4cd312 926 debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).\n",
528b2c61 927 pd->host.buf(), bit_util);
d1cdaa16 928 return 0;
9b7de833 929 }
930 return 1;
931}
7f6eb0fe 932
e13ee7ad 933static int
934saneDiff(time_t diff)
935{
8a6218c6 936 return abs(diff) > squid_curtime / 2 ? 0 : diff;
e13ee7ad 937}
938
939void
8a6218c6 940peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e)
e13ee7ad 941{
942#define f2s(flag) (pd->flags.flag ? "yes" : "no")
38650cc8 943#define appendTime(tm) storeAppendPrintf(e, "%s\t %10ld\t %+d\t %+d\n", \
944 ""#tm, (long int)pd->times.tm, \
e13ee7ad 945 saneDiff(pd->times.tm - squid_curtime), \
946 saneDiff(pd->times.tm - pd->times.initialized))
947
e13ee7ad 948 assert(pd);
949
528b2c61 950 const char *host = pd->host.buf();
e13ee7ad 951 storeAppendPrintf(e, "\npeer digest from %s\n", host);
952
953 cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
954
955 storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");
956 appendTime(initialized);
957 appendTime(needed);
958 appendTime(requested);
959 appendTime(received);
960 appendTime(next_check);
961
962 storeAppendPrintf(e, "peer digest state:\n");
963 storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",
964 f2s(needed), f2s(usable), f2s(requested));
965 storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",
84f2d773 966 (int) pd->times.retry_delay);
e13ee7ad 967 storeAppendPrintf(e, "\tlast request response time: %d secs\n",
84f2d773 968 (int) pd->times.req_delay);
e13ee7ad 969 storeAppendPrintf(e, "\tlast request result: %s\n",
970 pd->req_result ? pd->req_result : "(none)");
971
972 storeAppendPrintf(e, "\npeer digest traffic:\n");
973 storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",
974 pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
975 storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n",
976 pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
977
978 storeAppendPrintf(e, "\npeer digest structure:\n");
979 if (pd->cd)
980 cacheDigestReport(pd->cd, host, e);
981 else
982 storeAppendPrintf(e, "\tno in-memory copy\n");
983}
984
7f6eb0fe 985#endif