]> git.ipfire.org Git - thirdparty/squid.git/blame - src/peer_digest.cc
Summary: Merge cleanup code from comms branch.
[thirdparty/squid.git] / src / peer_digest.cc
CommitLineData
9b7de833 1
2/*
62e76326 3 * $Id: peer_digest.cc,v 1.95 2003/02/21 22:50:10 robertc Exp $
9b7de833 4 *
5 * DEBUG: section 72 Peer Digest Routines
6 * AUTHOR: Alex Rousskov
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
e25c139f 9 * ----------------------------------------------------------
9b7de833 10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
9b7de833 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
cbdec147 32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
e25c139f 33 *
9b7de833 34 */
35
36#include "squid.h"
6cfa8966 37#if USE_CACHE_DIGESTS
7a2e5bb5 38
528b2c61 39#include "Store.h"
40#include "HttpRequest.h"
41#include "HttpReply.h"
42#include "MemObject.h"
598465f1 43#include "StoreClient.h"
44
9b7de833 45/* local types */
46
47/* local prototypes */
e13ee7ad 48static time_t peerDigestIncDelay(const PeerDigest * pd);
49static time_t peerDigestNewDelay(const StoreEntry * e);
50static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
eb16313f 51static void peerDigestClean(PeerDigest *);
e13ee7ad 52static EVH peerDigestCheck;
53static void peerDigestRequest(PeerDigest * pd);
add2192d 54static STCB peerDigestHandleReply;
3dfaa3d2 55static int peerDigestFetchReply(void *, char *, ssize_t);
56static int peerDigestSwapInHeaders(void *, char *, ssize_t);
57static int peerDigestSwapInCBlock(void *, char *, ssize_t);
58static int peerDigestSwapInMask(void *, char *, ssize_t);
4b4cd312 59static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
551e60a9 60static void peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason);
e13ee7ad 61static void peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason);
62static void peerDigestReqFinish(DigestFetchState * fetch, char *buf, int, int, int, const char *reason, int err);
63static void peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err);
64static void peerDigestFetchFinish(DigestFetchState * fetch, int err);
65static void peerDigestFetchSetStats(DigestFetchState * fetch);
66static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
67static int peerDigestUseful(const PeerDigest * pd);
395b813e 68
9b7de833 69
70/* local constants */
9d486b43 71
9b7de833 72#define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
73
e13ee7ad 74/* min interval for requesting digests from a given peer */
75static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
76/* min interval for requesting digests (cumulative request stream) */
77static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
bd890734 78
79/* local vars */
9d486b43 80
8a6218c6 81static time_t pd_last_req_time = 0; /* last call to Check */
9b7de833 82
e13ee7ad 83/* initialize peer digest */
84static void
8a6218c6 85peerDigestInit(PeerDigest * pd, peer * p)
9b7de833 86{
e13ee7ad 87 assert(pd && p);
88
89 memset(pd, 0, sizeof(*pd));
90 pd->peer = p;
91 /* if peer disappears, we will know it's name */
528b2c61 92 pd->host = p->host;
e13ee7ad 93
94 pd->times.initialized = squid_curtime;
9b7de833 95}
96
4b4cd312 97static void
8a6218c6 98peerDigestClean(PeerDigest * pd)
9b7de833 99{
e13ee7ad 100 assert(pd);
62e76326 101
e13ee7ad 102 if (pd->cd)
62e76326 103 cacheDigestDestroy(pd->cd);
104
528b2c61 105 pd->host.clean();
9b7de833 106}
107
28c60158 108CBDATA_TYPE(PeerDigest);
109
e13ee7ad 110/* allocate new peer digest, call Init, and lock everything */
111PeerDigest *
8a6218c6 112peerDigestCreate(peer * p)
e13ee7ad 113{
114 PeerDigest *pd;
8a6218c6 115 assert(p);
e13ee7ad 116
28c60158 117 CBDATA_INIT_TYPE(PeerDigest);
72711e31 118 pd = cbdataAlloc(PeerDigest);
e13ee7ad 119 peerDigestInit(pd, p);
e13ee7ad 120
fa80a8ef 121 /* XXX This does not look right, and the same thing again in the caller */
122 return cbdataReference(pd);
e13ee7ad 123}
124
125/* call Clean and free/unlock everything */
2d72d4fd 126static void
8a6218c6 127peerDigestDestroy(PeerDigest * pd)
e13ee7ad 128{
fcbbccfe 129 peer *p;
e13ee7ad 130 assert(pd);
e13ee7ad 131
fa80a8ef 132 /* inform peer (if any) that we are gone */
62e76326 133
bac6d4bd 134 if (cbdataReferenceValidDone(pd->peer, (void **) &p))
62e76326 135 peerNoteDigestGone(p);
e13ee7ad 136
137 peerDigestClean(pd);
62e76326 138
e13ee7ad 139 cbdataFree(pd);
140}
141
142/* called by peer to indicate that somebody actually needs this digest */
143void
8a6218c6 144peerDigestNeeded(PeerDigest * pd)
e13ee7ad 145{
146 assert(pd);
147 assert(!pd->flags.needed);
148 assert(!pd->cd);
149
150 pd->flags.needed = 1;
151 pd->times.needed = squid_curtime;
8a6218c6 152 peerDigestSetCheck(pd, 0); /* check asap */
e13ee7ad 153}
154
155/* currently we do not have a reason to disable without destroying */
8a6218c6 156#if FUTURE_CODE
395b813e 157/* disables peer for good */
158static void
8a6218c6 159peerDigestDisable(PeerDigest * pd)
395b813e 160{
8a6218c6 161 debug(72, 2) ("peerDigestDisable: peer %s disabled for good\n",
62e76326 162 pd->host.buf());
e13ee7ad 163 pd->times.disabled = squid_curtime;
8a6218c6 164 pd->times.next_check = -1; /* never */
e13ee7ad 165 pd->flags.usable = 0;
166
167 if (pd->cd) {
62e76326 168 cacheDigestDestroy(pd->cd);
169 pd->cd = NULL;
e13ee7ad 170 }
62e76326 171
e13ee7ad 172 /* we do not destroy the pd itself to preserve its "history" and stats */
395b813e 173}
62e76326 174
e13ee7ad 175#endif
395b813e 176
e13ee7ad 177/* increment retry delay [after an unsuccessful attempt] */
395b813e 178static time_t
8a6218c6 179peerDigestIncDelay(const PeerDigest * pd)
395b813e 180{
e13ee7ad 181 assert(pd);
182 return pd->times.retry_delay > 0 ?
62e76326 183 2 * pd->times.retry_delay : /* exponential backoff */
184 PeerDigestReqMinGap; /* minimal delay */
395b813e 185}
186
62e76326 187/* artificially increases Expires: setting to avoid race conditions
e13ee7ad 188 * returns the delay till that [increased] expiration time */
00485c29 189static time_t
e13ee7ad 190peerDigestNewDelay(const StoreEntry * e)
00485c29 191{
e13ee7ad 192 assert(e);
62e76326 193
00485c29 194 if (e->expires > 0)
62e76326 195 return e->expires + PeerDigestReqMinGap - squid_curtime;
196
e13ee7ad 197 return PeerDigestReqMinGap;
00485c29 198}
199
e13ee7ad 200/* registers next digest verification */
395b813e 201static void
e13ee7ad 202peerDigestSetCheck(PeerDigest * pd, time_t delay)
395b813e 203{
e13ee7ad 204 eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
205 pd->times.next_check = squid_curtime + delay;
206 debug(72, 3) ("peerDigestSetCheck: will check peer %s in %d secs\n",
62e76326 207 pd->host.buf(), (int) delay);
e13ee7ad 208}
209
5385c86a 210/*
211 * called when peer is about to disappear or have already disappeared
212 */
e13ee7ad 213void
8a6218c6 214peerDigestNotePeerGone(PeerDigest * pd)
215{
e13ee7ad 216 if (pd->flags.requested) {
62e76326 217 debug(72, 2) ("peerDigest: peer %s gone, will destroy after fetch.\n",
218 pd->host.buf());
219 /* do nothing now, the fetching chain will notice and take action */
395b813e 220 } else {
62e76326 221 debug(72, 2) ("peerDigest: peer %s is gone, destroying now.\n",
222 pd->host.buf());
223 peerDigestDestroy(pd);
395b813e 224 }
225}
226
e13ee7ad 227/* callback for eventAdd() (with peer digest locked)
228 * request new digest if our copy is too old or if we lack one;
229 * schedule next check otherwise */
9b7de833 230static void
e13ee7ad 231peerDigestCheck(void *data)
9b7de833 232{
e6ccf245 233 PeerDigest *pd = (PeerDigest *)data;
e13ee7ad 234 time_t req_time;
235
e13ee7ad 236 assert(!pd->flags.requested);
5d9bb360 237
8a6218c6 238 pd->times.next_check = 0; /* unknown */
e13ee7ad 239
fa80a8ef 240 if (!cbdataReferenceValid(pd->peer)) {
62e76326 241 peerDigestNotePeerGone(pd);
242 return;
9b7de833 243 }
62e76326 244
e13ee7ad 245 debug(72, 3) ("peerDigestCheck: peer %s:%d\n", pd->peer->host, pd->peer->http_port);
38650cc8 246 debug(72, 3) ("peerDigestCheck: time: %ld, last received: %ld (%+d)\n",
62e76326 247 (long int) squid_curtime, (long int) pd->times.received, (int) (squid_curtime - pd->times.received));
e13ee7ad 248
249 /* decide when we should send the request:
250 * request now unless too close to other requests */
251 req_time = squid_curtime;
252
253 /* per-peer limit */
62e76326 254
e13ee7ad 255 if (req_time - pd->times.received < PeerDigestReqMinGap) {
62e76326 256 debug(72, 2) ("peerDigestCheck: %s, avoiding close peer requests (%d < %d secs).\n",
257 pd->host.buf(), (int) (req_time - pd->times.received),
258 (int) PeerDigestReqMinGap);
259 req_time = pd->times.received + PeerDigestReqMinGap;
bd890734 260 }
62e76326 261
e13ee7ad 262 /* global limit */
263 if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
62e76326 264 debug(72, 2) ("peerDigestCheck: %s, avoiding close requests (%d < %d secs).\n",
265 pd->host.buf(), (int) (req_time - pd_last_req_time),
266 (int) GlobDigestReqMinGap);
267 req_time = pd_last_req_time + GlobDigestReqMinGap;
9b7de833 268 }
62e76326 269
e13ee7ad 270 if (req_time <= squid_curtime)
62e76326 271 peerDigestRequest(pd); /* will set pd->flags.requested */
e13ee7ad 272 else
62e76326 273 peerDigestSetCheck(pd, req_time - squid_curtime);
9b7de833 274}
275
2812146b 276CBDATA_TYPE(DigestFetchState);
277
e13ee7ad 278/* ask store for a digest */
9b7de833 279static void
e13ee7ad 280peerDigestRequest(PeerDigest * pd)
9b7de833 281{
e13ee7ad 282 peer *p = pd->peer;
9b7de833 283 StoreEntry *e, *old_e;
284 char *url;
9d486b43 285 const cache_key *key;
9b7de833 286 request_t *req;
287 DigestFetchState *fetch = NULL;
528b2c61 288 StoreIOBuffer tempBuffer;
e13ee7ad 289
290 pd->req_result = NULL;
291 pd->flags.requested = 1;
292
9b7de833 293 /* compute future request components */
62e76326 294
7e3ce7b9 295 if (p->digest_url)
62e76326 296 url = xstrdup(p->digest_url);
7e3ce7b9 297 else
62e76326 298 url = internalRemoteUri(p->host, p->http_port,
299 "/squid-internal-periodic/", StoreDigestFileName);
7e3ce7b9 300
50a97cee 301 req = urlParse(METHOD_GET, url);
62e76326 302
e13ee7ad 303 assert(req);
62e76326 304
f66a9ef4 305 key = storeKeyPublicByRequest(req);
62e76326 306
f66a9ef4 307 debug(72, 2) ("peerDigestRequest: %s key: %s\n", url, storeKeyText(key));
e13ee7ad 308
9b7de833 309 /* add custom headers */
2246b732 310 assert(!req->header.len);
62e76326 311
4820f62b 312 httpHeaderPutStr(&req->header, HDR_ACCEPT, StoreDigestMimeStr);
62e76326 313
4820f62b 314 httpHeaderPutStr(&req->header, HDR_ACCEPT, "text/html");
62e76326 315
9bc73deb 316 if (p->login)
62e76326 317 xstrncpy(req->login, p->login, MAX_LOGIN_SZ);
318
9b7de833 319 /* create fetch state structure */
2812146b 320 CBDATA_INIT_TYPE(DigestFetchState);
62e76326 321
72711e31 322 fetch = cbdataAlloc(DigestFetchState);
62e76326 323
903c39e4 324 fetch->request = requestLink(req);
62e76326 325
fa80a8ef 326 fetch->pd = cbdataReference(pd);
62e76326 327
e13ee7ad 328 fetch->offset = 0;
62e76326 329
add2192d 330 fetch->state = DIGEST_READ_REPLY;
e13ee7ad 331
332 /* update timestamps */
9b7de833 333 fetch->start_time = squid_curtime;
62e76326 334
e13ee7ad 335 pd->times.requested = squid_curtime;
62e76326 336
e13ee7ad 337 pd_last_req_time = squid_curtime;
338
92695e5e 339 req->flags.cachable = 1;
62e76326 340
9b7de833 341 /* the rest is based on clientProcessExpired() */
92695e5e 342 req->flags.refresh = 1;
62e76326 343
9d486b43 344 old_e = fetch->old_entry = storeGet(key);
62e76326 345
9b7de833 346 if (old_e) {
62e76326 347 debug(72, 5) ("peerDigestRequest: found old entry\n");
348 storeLockObject(old_e);
349 storeCreateMemObject(old_e, url, url);
350 fetch->old_sc = storeClientListAdd(old_e, fetch);
9b7de833 351 }
62e76326 352
9b7de833 353 e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
e13ee7ad 354 assert(EBIT_TEST(e->flags, KEY_PRIVATE));
06d2839d 355 fetch->sc = storeClientListAdd(e, fetch);
9b7de833 356 /* set lastmod to trigger IMS request if possible */
62e76326 357
9b7de833 358 if (old_e)
62e76326 359 e->lastmod = old_e->lastmod;
e13ee7ad 360
9b7de833 361 /* push towards peer cache */
e13ee7ad 362 debug(72, 3) ("peerDigestRequest: forwarding to fwdStart...\n");
62e76326 363
7e3ce7b9 364 fwdStart(-1, e, req);
62e76326 365
598465f1 366 tempBuffer.offset = 0;
62e76326 367
f95db407 368 tempBuffer.length = SM_PAGE_SIZE;
62e76326 369
598465f1 370 tempBuffer.data = fetch->buf;
62e76326 371
598465f1 372 storeClientCopy(fetch->sc, e, tempBuffer,
62e76326 373 peerDigestHandleReply, fetch);
9b7de833 374}
375
add2192d 376
377/* Handle the data copying .. */
378
379/*
380 * This routine handles the copy data and then redirects the
381 * copy to a bunch of subfunctions depending upon the copy state.
382 * It also tracks the buffer offset and "seen", since I'm actually
383 * not interested in rewriting everything to suit my little idea.
384 */
9b7de833 385static void
598465f1 386peerDigestHandleReply(void *data, StoreIOBuffer recievedData)
add2192d 387{
e6ccf245 388 DigestFetchState *fetch = (DigestFetchState *)data;
add2192d 389 PeerDigest *pd = fetch->pd;
390 int retsize = -1;
391 digest_read_state_t prevstate;
392 int newsize;
393
598465f1 394 assert(pd && recievedData.data);
395 /* The existing code assumes that the recieved pointer is
396 * where we asked the data to be put
397 */
398 assert(fetch->buf + fetch->bufofs == recievedData.data);
add2192d 399
400 /* Update the buffer size */
598465f1 401 fetch->bufofs += recievedData.length;
add2192d 402
403 assert(fetch->bufofs <= SM_PAGE_SIZE);
404
405 /* If we've fetched enough, return */
62e76326 406
add2192d 407 if (peerDigestFetchedEnough(fetch, fetch->buf, fetch->bufofs, "peerDigestHandleReply"))
62e76326 408 return;
add2192d 409
410 /* Call the right function based on the state */
411 /* (Those functions will update the state if needed) */
fa80a8ef 412
e2848e94 413 /* Give us a temporary reference. Some of the calls we make may
414 * try to destroy the fetch structure, and we like to know if they
415 * do
bac6d4bd 416 */
e2848e94 417 fetch = cbdataReference(fetch);
add2192d 418
419 /* Repeat this loop until we're out of data OR the state changes */
420 /* (So keep going if the state has changed and we still have data */
421 do {
62e76326 422 prevstate = fetch->state;
423
424 switch (fetch->state) {
425
426 case DIGEST_READ_REPLY:
427 retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs);
428 break;
429
430 case DIGEST_READ_HEADERS:
431 retsize = peerDigestSwapInHeaders(fetch, fetch->buf, fetch->bufofs);
432 break;
433
434 case DIGEST_READ_CBLOCK:
435 retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs);
436 break;
437
438 case DIGEST_READ_MASK:
439 retsize = peerDigestSwapInMask(fetch, fetch->buf, fetch->bufofs);
440 break;
441
442 case DIGEST_READ_NONE:
443 break;
444
445 case DIGEST_READ_DONE:
446 goto finish;
447 break;
448
449 default:
450 fatal("Bad digest transfer mode!\n");
451 }
452
453 if (retsize < 0)
454 goto finish;
455
456 /*
457 * The returned size indicates how much of the buffer was read -
458 * so move the remainder of the buffer to the beginning
459 * and update the bufofs / bufsize
460 */
461 newsize = fetch->bufofs - retsize;
462
463 xmemmove(fetch->buf, fetch->buf + retsize, fetch->bufofs - newsize);
464
465 fetch->bufofs = newsize;
add2192d 466
e2848e94 467 } while (cbdataReferenceValid(fetch) && prevstate != fetch->state && fetch->bufofs > 0);
add2192d 468
469 /* Update the copy offset */
598465f1 470 fetch->offset += recievedData.length;
add2192d 471
472 /* Schedule another copy */
fa80a8ef 473 if (cbdataReferenceValid(fetch)) {
62e76326 474 StoreIOBuffer tempBuffer;
475 tempBuffer.offset = fetch->offset;
476 tempBuffer.length = SM_PAGE_SIZE - fetch->bufofs;
477 tempBuffer.data = fetch->buf + fetch->bufofs;
478 storeClientCopy(fetch->sc, fetch->entry, tempBuffer,
479 peerDigestHandleReply, fetch);
add2192d 480 }
62e76326 481
482finish:
e2848e94 483 /* Get rid of our reference, we've finished with it for now */
484 cbdataReferenceDone(fetch);
add2192d 485}
486
487
488
489/* wait for full http headers to be received then parse them */
490/*
491 * This routine handles parsing the reply line.
492 * If the reply line indicates an OK, the same data is thrown
493 * to SwapInHeaders(). If the reply line is a NOT_MODIFIED,
494 * we simply stop parsing.
495 */
496static int
9b7de833 497peerDigestFetchReply(void *data, char *buf, ssize_t size)
498{
e6ccf245 499 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 500 PeerDigest *pd = fetch->pd;
9bc73deb 501 size_t hdr_size;
e13ee7ad 502 assert(pd && buf);
9b7de833 503 assert(!fetch->offset);
e13ee7ad 504
add2192d 505 assert(fetch->state == DIGEST_READ_REPLY);
62e76326 506
9b7de833 507 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
62e76326 508 return -1;
e13ee7ad 509
9bc73deb 510 if ((hdr_size = headersEnd(buf, size))) {
62e76326 511 http_status status;
512 HttpReply const *reply = fetch->entry->getReply();
513 assert(reply);
514 assert (reply->sline.status != 0);
515 status = reply->sline.status;
516 debug(72, 3) ("peerDigestFetchReply: %s status: %d, expires: %ld (%+d)\n",
517 pd->host.buf(), status,
518 (long int) reply->expires, (int) (reply->expires - squid_curtime));
519
520 /* this "if" is based on clientHandleIMSReply() */
521
522 if (status == HTTP_NOT_MODIFIED) {
523 request_t *r = NULL;
524 /* our old entry is fine */
525 assert(fetch->old_entry);
526
527 if (!fetch->old_entry->mem_obj->request)
528 fetch->old_entry->mem_obj->request = r =
529 requestLink(fetch->entry->mem_obj->request);
530
531 assert(fetch->old_entry->mem_obj->request);
532
533 httpReplyUpdateOnNotModified((HttpReply *)fetch->old_entry->getReply(), reply);
534
535 storeTimestampsSet(fetch->old_entry);
536
537 /* get rid of 304 reply */
538 storeUnregister(fetch->sc, fetch->entry, fetch);
539
540 storeUnlockObject(fetch->entry);
541
542 fetch->entry = fetch->old_entry;
543
544 fetch->old_entry = NULL;
545
546 /* preserve request -- we need its size to update counters */
547 /* requestUnlink(r); */
548 /* fetch->entry->mem_obj->request = NULL; */
549 } else if (status == HTTP_OK) {
550 /* get rid of old entry if any */
551
552 if (fetch->old_entry) {
553 debug(72, 3) ("peerDigestFetchReply: got new digest, releasing old one\n");
554 storeUnregister(fetch->old_sc, fetch->old_entry, fetch);
555 storeReleaseRequest(fetch->old_entry);
556 storeUnlockObject(fetch->old_entry);
557 fetch->old_entry = NULL;
558 }
559 } else {
560 /* some kind of a bug */
561 peerDigestFetchAbort(fetch, buf, httpStatusLineReason(&reply->sline));
562 return -1; /* XXX -1 will abort stuff in ReadReply! */
563 }
564
565 /* must have a ready-to-use store entry if we got here */
566 /* can we stay with the old in-memory digest? */
567 if (status == HTTP_NOT_MODIFIED && fetch->pd->cd) {
568 peerDigestFetchStop(fetch, buf, "Not modified");
569 fetch->state = DIGEST_READ_DONE;
570 } else {
571 fetch->state = DIGEST_READ_HEADERS;
572 }
9b7de833 573 } else {
62e76326 574 /* need more data, do we have space? */
575
576 if (size >= SM_PAGE_SIZE)
577 peerDigestFetchAbort(fetch, buf, "reply header too big");
9b7de833 578 }
add2192d 579
580 /* We don't want to actually ack that we've handled anything,
581 * otherwise SwapInHeaders() won't get the reply line .. */
582 return 0;
9b7de833 583}
584
585/* fetch headers from disk, pass on to SwapInCBlock */
add2192d 586static int
9b7de833 587peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
588{
e6ccf245 589 DigestFetchState *fetch = (DigestFetchState *)data;
9b7de833 590 size_t hdr_size;
e13ee7ad 591
add2192d 592 assert(fetch->state == DIGEST_READ_HEADERS);
62e76326 593
9b7de833 594 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
62e76326 595 return -1;
e13ee7ad 596
9d486b43 597 assert(!fetch->offset);
62e76326 598
9b7de833 599 if ((hdr_size = headersEnd(buf, size))) {
62e76326 600 assert(fetch->entry->getReply());
601 assert (fetch->entry->getReply()->sline.status != 0);
602
603 if (fetch->entry->getReply()->sline.status != HTTP_OK) {
604 debug(72, 1) ("peerDigestSwapInHeaders: %s status %d got cached!\n",
605 fetch->pd->host.buf(), fetch->entry->getReply()->sline.status);
606 peerDigestFetchAbort(fetch, buf, "internal status error");
607 return -1;
608 }
609
610 fetch->state = DIGEST_READ_CBLOCK;
611 return hdr_size; /* Say how much data we read */
9b7de833 612 } else {
62e76326 613 /* need more data, do we have space? */
614
615 if (size >= SM_PAGE_SIZE) {
616 peerDigestFetchAbort(fetch, buf, "stored header too big");
617 return -1;
618 } else {
619 return 0; /* We need to read more to parse .. */
620 }
9b7de833 621 }
62e76326 622
add2192d 623 fatal("peerDigestSwapInHeaders() - shouldn't get here!\n");
9b7de833 624}
625
add2192d 626static int
9b7de833 627peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
628{
e6ccf245 629 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 630
add2192d 631 assert(fetch->state == DIGEST_READ_CBLOCK);
62e76326 632
9b7de833 633 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
62e76326 634 return -1;
e13ee7ad 635
e6ccf245 636 if (size >= (ssize_t)StoreDigestCBlockSize) {
62e76326 637 PeerDigest *pd = fetch->pd;
638 HttpReply const *rep = fetch->entry->getReply();
639
640 assert(pd && rep);
641
642 if (peerDigestSetCBlock(pd, buf)) {
643 /* XXX: soon we will have variable header size */
644 /* switch to CD buffer and fetch digest guts */
645 buf = NULL;
646 assert(pd->cd->mask);
647 fetch->state = DIGEST_READ_MASK;
648 return StoreDigestCBlockSize;
649 } else {
650 peerDigestFetchAbort(fetch, buf, "invalid digest cblock");
651 return -1;
652 }
9b7de833 653 } else {
62e76326 654 /* need more data, do we have space? */
655
656 if (size >= SM_PAGE_SIZE) {
657 peerDigestFetchAbort(fetch, buf, "digest cblock too big");
658 return -1;
659 } else {
660 return 0; /* We need more data */
661 }
9b7de833 662 }
62e76326 663
add2192d 664 fatal("peerDigestSwapInCBlock(): shouldn't get here!\n");
9b7de833 665}
666
add2192d 667static int
9b7de833 668peerDigestSwapInMask(void *data, char *buf, ssize_t size)
669{
e6ccf245 670 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 671 PeerDigest *pd;
672
e13ee7ad 673 pd = fetch->pd;
674 assert(pd->cd && pd->cd->mask);
9d486b43 675
add2192d 676 /*
677 * NOTENOTENOTENOTENOTE: buf doesn't point to pd->cd->mask anymore!
678 * we need to do the copy ourselves!
679 */
680 xmemcpy(pd->cd->mask + fetch->mask_offset, buf, size);
681
682 /* NOTE! buf points to the middle of pd->cd->mask! */
62e76326 683
add2192d 684 if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask"))
62e76326 685 return -1;
add2192d 686
da407def 687 fetch->mask_offset += size;
62e76326 688
e6ccf245 689 if (fetch->mask_offset >= (off_t)pd->cd->mask_size) {
62e76326 690 debug(72, 2) ("peerDigestSwapInMask: Done! Got %d, expected %d\n",
691 fetch->mask_offset, pd->cd->mask_size);
692 assert(fetch->mask_offset == (off_t)pd->cd->mask_size);
693 assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask"));
694 return -1; /* XXX! */
e13ee7ad 695 } else {
62e76326 696 /* We always read everything, so return so */
697 return size;
9b7de833 698 }
62e76326 699
add2192d 700 fatal("peerDigestSwapInMask(): shouldn't get here!\n");
9b7de833 701}
702
703static int
4b4cd312 704peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name)
9b7de833 705{
e13ee7ad 706 PeerDigest *pd = NULL;
8a6218c6 707 const char *host = "<unknown>"; /* peer host */
708 const char *reason = NULL; /* reason for completion */
709 const char *no_bug = NULL; /* successful completion if set */
e2848e94 710 const int pdcb_valid = cbdataReferenceValid(fetch->pd);
711 const int pcb_valid = cbdataReferenceValid(fetch->pd->peer);
e13ee7ad 712
713 /* test possible exiting conditions (the same for most steps!)
714 * cases marked with '?!' should not happen */
715
716 if (!reason) {
62e76326 717 if (!(pd = fetch->pd))
718 reason = "peer digest disappeared?!";
719
bac6d4bd 720#if DONT /* WHY NOT? /HNO */
62e76326 721
722 else if (!cbdataReferenceValid(pd))
723 reason = "invalidated peer digest?!";
724
5385c86a 725#endif
62e76326 726
727 else
728 host = pd->host.buf();
e13ee7ad 729 }
62e76326 730
e13ee7ad 731 debug(72, 6) ("%s: peer %s, offset: %d size: %d.\n",
62e76326 732 step_name, host,
733 fetch->offset, size);
e13ee7ad 734
735 /* continue checking (with pd and host known and valid) */
62e76326 736
e13ee7ad 737 if (!reason) {
62e76326 738 if (!cbdataReferenceValid(pd->peer))
739 reason = "peer disappeared";
740 else if (size < 0)
741 reason = "swap failure";
742 else if (!fetch->entry)
743 reason = "swap aborted?!";
744 else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))
745 reason = "swap aborted";
e13ee7ad 746 }
62e76326 747
e13ee7ad 748 /* continue checking (maybe-successful eof case) */
749 if (!reason && !size) {
62e76326 750 if (!pd->cd)
751 reason = "null digest?!";
752 else if (fetch->mask_offset != (off_t)pd->cd->mask_size)
753 reason = "premature end of digest?!";
754 else if (!peerDigestUseful(pd))
755 reason = "useless digest";
756 else
757 reason = no_bug = "success";
e13ee7ad 758 }
62e76326 759
e13ee7ad 760 /* finish if we have a reason */
9b7de833 761 if (reason) {
62e76326 762 const int level = strstr(reason, "?!") ? 1 : 3;
763 debug(72, level) ("%s: peer %s, exiting after '%s'\n",
764 step_name, host, reason);
765 peerDigestReqFinish(fetch, buf,
766 1, pdcb_valid, pcb_valid, reason, !no_bug);
e13ee7ad 767 } else {
62e76326 768 /* paranoid check */
769 assert(pdcb_valid && pcb_valid);
e13ee7ad 770 }
62e76326 771
e13ee7ad 772 return reason != NULL;
773}
774
551e60a9 775/* call this when all callback data is valid and fetch must be stopped but
776 * no error has occurred (e.g. we received 304 reply and reuse old digest) */
777static void
778peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason)
779{
780 assert(reason);
781 debug(72, 2) ("peerDigestFetchStop: peer %s, reason: %s\n",
62e76326 782 fetch->pd->host.buf(), reason);
551e60a9 783 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 0);
784}
785
e13ee7ad 786/* call this when all callback data is valid but something bad happened */
787static void
788peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason)
789{
551e60a9 790 assert(reason);
791 debug(72, 2) ("peerDigestFetchAbort: peer %s, reason: %s\n",
62e76326 792 fetch->pd->host.buf(), reason);
e13ee7ad 793 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);
794}
795
796/* complete the digest transfer, update stats, unlock/release everything */
797static void
798peerDigestReqFinish(DigestFetchState * fetch, char *buf,
62e76326 799 int fcb_valid, int pdcb_valid, int pcb_valid,
800 const char *reason, int err)
e13ee7ad 801{
802 assert(reason);
803
804 /* must go before peerDigestPDFinish */
62e76326 805
e13ee7ad 806 if (pdcb_valid) {
62e76326 807 fetch->pd->flags.requested = 0;
808 fetch->pd->req_result = reason;
e13ee7ad 809 }
62e76326 810
e13ee7ad 811 /* schedule next check if peer is still out there */
812 if (pcb_valid) {
62e76326 813 PeerDigest *pd = fetch->pd;
814
815 if (err) {
816 pd->times.retry_delay = peerDigestIncDelay(pd);
817 peerDigestSetCheck(pd, pd->times.retry_delay);
818 } else {
819 pd->times.retry_delay = 0;
820 peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
821 }
9b7de833 822 }
62e76326 823
e13ee7ad 824 /* note: order is significant */
825 if (fcb_valid)
62e76326 826 peerDigestFetchSetStats(fetch);
827
e13ee7ad 828 if (pdcb_valid)
62e76326 829 peerDigestPDFinish(fetch, pcb_valid, err);
830
e13ee7ad 831 if (fcb_valid)
62e76326 832 peerDigestFetchFinish(fetch, err);
9b7de833 833}
834
e13ee7ad 835
836/* destroys digest if peer disappeared
837 * must be called only when fetch and pd cbdata are valid */
9b7de833 838static void
e13ee7ad 839peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err)
9b7de833 840{
e13ee7ad 841 PeerDigest *pd = fetch->pd;
528b2c61 842 const char *host = pd->host.buf();
e13ee7ad 843
844 pd->times.received = squid_curtime;
845 pd->times.req_delay = fetch->resp_time;
8a6218c6 846 kb_incr(&pd->stats.sent.kbytes, (size_t) fetch->sent.bytes);
847 kb_incr(&pd->stats.recv.kbytes, (size_t) fetch->recv.bytes);
e13ee7ad 848 pd->stats.sent.msgs += fetch->sent.msg;
849 pd->stats.recv.msgs += fetch->recv.msg;
850
851 if (err) {
62e76326 852 debug(72, 1) ("%sdisabling (%s) digest from %s\n",
853 pcb_valid ? "temporary " : "",
854 pd->req_result, host);
855
856 if (pd->cd) {
857 cacheDigestDestroy(pd->cd);
858 pd->cd = NULL;
859 }
860
861 pd->flags.usable = 0;
862
863 if (!pcb_valid)
864 peerDigestNotePeerGone(pd);
e13ee7ad 865 } else {
62e76326 866 assert(pcb_valid);
867
868 pd->flags.usable = 1;
e13ee7ad 869
62e76326 870 /* XXX: ugly condition, but how? */
e13ee7ad 871
62e76326 872 if (fetch->entry->store_status == STORE_OK)
873 debug(72, 2) ("re-used old digest from %s\n", host);
874 else
875 debug(72, 2) ("received valid digest from %s\n", host);
d1cdaa16 876 }
62e76326 877
fa80a8ef 878 cbdataReferenceDone(fetch->pd);
e13ee7ad 879}
880
881/* free fetch state structures
882 * must be called only when fetch cbdata is valid */
883static void
884peerDigestFetchFinish(DigestFetchState * fetch, int err)
885{
886 assert(fetch->entry && fetch->request);
887
9b7de833 888 if (fetch->old_entry) {
62e76326 889 debug(72, 2) ("peerDigestFetchFinish: deleting old entry\n");
890 storeUnregister(fetch->sc, fetch->old_entry, fetch);
891 storeReleaseRequest(fetch->old_entry);
892 storeUnlockObject(fetch->old_entry);
893 fetch->old_entry = NULL;
9b7de833 894 }
62e76326 895
1543ab6c 896 /* update global stats */
83704487 897 kb_incr(&statCounter.cd.kbytes_sent, (size_t) fetch->sent.bytes);
62e76326 898
83704487 899 kb_incr(&statCounter.cd.kbytes_recv, (size_t) fetch->recv.bytes);
62e76326 900
83704487 901 statCounter.cd.msgs_sent += fetch->sent.msg;
62e76326 902
83704487 903 statCounter.cd.msgs_recv += fetch->recv.msg;
e13ee7ad 904
1543ab6c 905 /* unlock everything */
06d2839d 906 storeUnregister(fetch->sc, fetch->entry, fetch);
62e76326 907
9b7de833 908 storeUnlockObject(fetch->entry);
62e76326 909
903c39e4 910 requestUnlink(fetch->request);
62e76326 911
9b7de833 912 fetch->entry = NULL;
62e76326 913
903c39e4 914 fetch->request = NULL;
62e76326 915
3855c318 916 assert(fetch->pd == NULL);
62e76326 917
9b7de833 918 cbdataFree(fetch);
9b7de833 919}
920
e13ee7ad 921/* calculate fetch stats after completion */
922static void
923peerDigestFetchSetStats(DigestFetchState * fetch)
924{
925 MemObject *mem;
926 assert(fetch->entry && fetch->request);
927
928 mem = fetch->entry->mem_obj;
929 assert(mem);
930
931 /* XXX: outgoing numbers are not precise */
932 /* XXX: we must distinguish between 304 hits and misses here */
933 fetch->sent.bytes = httpRequestPrefixLen(fetch->request);
528b2c61 934 /* XXX: this is slightly wrong: we don't KNOW that the entire memobject
935 * was fetched. We only know how big it is
936 */
937 fetch->recv.bytes = mem->size();
e13ee7ad 938 fetch->sent.msg = fetch->recv.msg = 1;
939 fetch->expires = fetch->entry->expires;
940 fetch->resp_time = squid_curtime - fetch->start_time;
941
942 debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secs\n",
62e76326 943 fetch->recv.bytes, (int) fetch->resp_time);
38650cc8 944 debug(72, 3) ("peerDigestFetchFinish: expires: %ld (%+d), lmt: %ld (%+d)\n",
62e76326 945 (long int) fetch->expires, (int) (fetch->expires - squid_curtime),
946 (long int) fetch->entry->lastmod, (int) (fetch->entry->lastmod - squid_curtime));
e13ee7ad 947}
948
949
9b7de833 950static int
8a6218c6 951peerDigestSetCBlock(PeerDigest * pd, const char *buf)
9b7de833 952{
953 StoreDigestCBlock cblock;
954 int freed_size = 0;
528b2c61 955 const char *host = pd->host.buf();
e13ee7ad 956
9b7de833 957 xmemcpy(&cblock, buf, sizeof(cblock));
958 /* network -> host conversions */
959 cblock.ver.current = ntohs(cblock.ver.current);
960 cblock.ver.required = ntohs(cblock.ver.required);
961 cblock.capacity = ntohl(cblock.capacity);
962 cblock.count = ntohl(cblock.count);
963 cblock.del_count = ntohl(cblock.del_count);
964 cblock.mask_size = ntohl(cblock.mask_size);
4b4cd312 965 debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)\n",
62e76326 966 host, (int) cblock.ver.current, (int) cblock.ver.required);
4b4cd312 967 debug(72, 2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n",
62e76326 968 cblock.mask_size, cblock.count,
969 xpercentInt(cblock.count, cblock.capacity));
6106c6fc 970 /* check version requirements (both ways) */
62e76326 971
9b7de833 972 if (cblock.ver.required > CacheDigestVer.current) {
62e76326 973 debug(72, 1) ("%s digest requires version %d; have: %d\n",
974 host, cblock.ver.required, CacheDigestVer.current);
975 return 0;
9b7de833 976 }
62e76326 977
6106c6fc 978 if (cblock.ver.current < CacheDigestVer.required) {
62e76326 979 debug(72, 1) ("%s digest is version %d; we require: %d\n",
980 host, cblock.ver.current, CacheDigestVer.required);
981 return 0;
6106c6fc 982 }
62e76326 983
9b7de833 984 /* check consistency */
4b4cd312 985 if (cblock.ver.required > cblock.ver.current ||
62e76326 986 cblock.mask_size <= 0 || cblock.capacity <= 0 ||
987 cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
988 debug(72, 0) ("%s digest cblock is corrupted.\n", host);
989 return 0;
9b7de833 990 }
62e76326 991
d1cdaa16 992 /* check consistency further */
e6ccf245 993 if ((size_t)cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
62e76326 994 debug(72, 0) ("%s digest cblock is corrupted (mask size mismatch: %d ? %d).\n",
995 host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry));
996 return 0;
d1cdaa16 997 }
62e76326 998
d1cdaa16 999 /* there are some things we cannot do yet */
1000 if (cblock.hash_func_count != CacheDigestHashFuncCount) {
62e76326 1001 debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.\n",
1002 host, cblock.hash_func_count, CacheDigestHashFuncCount);
1003 return 0;
d1cdaa16 1004 }
62e76326 1005
9b7de833 1006 /*
1007 * no cblock bugs below this point
1008 */
1009 /* check size changes */
e6ccf245 1010 if (pd->cd && cblock.mask_size != (ssize_t)pd->cd->mask_size) {
62e76326 1011 debug(72, 2) ("%s digest changed size: %d -> %d\n",
1012 host, cblock.mask_size, pd->cd->mask_size);
1013 freed_size = pd->cd->mask_size;
1014 cacheDigestDestroy(pd->cd);
1015 pd->cd = NULL;
9b7de833 1016 }
62e76326 1017
e13ee7ad 1018 if (!pd->cd) {
62e76326 1019 debug(72, 2) ("creating %s digest; size: %d (%+d) bytes\n",
1020 host, cblock.mask_size, (int) (cblock.mask_size - freed_size));
1021 pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);
1022
1023 if (cblock.mask_size >= freed_size)
1024 kb_incr(&statCounter.cd.memory, cblock.mask_size - freed_size);
9b7de833 1025 }
62e76326 1026
e13ee7ad 1027 assert(pd->cd);
9b7de833 1028 /* these assignments leave us in an inconsistent state until we finish reading the digest */
e13ee7ad 1029 pd->cd->count = cblock.count;
1030 pd->cd->del_count = cblock.del_count;
9b7de833 1031 return 1;
1032}
1033
9b7de833 1034static int
8a6218c6 1035peerDigestUseful(const PeerDigest * pd)
9b7de833 1036{
d1cdaa16 1037 /* TODO: we should calculate the prob of a false hit instead of bit util */
e13ee7ad 1038 const int bit_util = cacheDigestBitUtil(pd->cd);
62e76326 1039
e13ee7ad 1040 if (bit_util > 65) {
62e76326 1041 debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).\n",
1042 pd->host.buf(), bit_util);
1043 return 0;
9b7de833 1044 }
62e76326 1045
9b7de833 1046 return 1;
1047}
7f6eb0fe 1048
e13ee7ad 1049static int
1050saneDiff(time_t diff)
1051{
8a6218c6 1052 return abs(diff) > squid_curtime / 2 ? 0 : diff;
e13ee7ad 1053}
1054
1055void
8a6218c6 1056peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e)
e13ee7ad 1057{
1058#define f2s(flag) (pd->flags.flag ? "yes" : "no")
38650cc8 1059#define appendTime(tm) storeAppendPrintf(e, "%s\t %10ld\t %+d\t %+d\n", \
1060 ""#tm, (long int)pd->times.tm, \
e13ee7ad 1061 saneDiff(pd->times.tm - squid_curtime), \
1062 saneDiff(pd->times.tm - pd->times.initialized))
1063
e13ee7ad 1064 assert(pd);
1065
528b2c61 1066 const char *host = pd->host.buf();
e13ee7ad 1067 storeAppendPrintf(e, "\npeer digest from %s\n", host);
1068
1069 cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
1070
1071 storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");
1072 appendTime(initialized);
1073 appendTime(needed);
1074 appendTime(requested);
1075 appendTime(received);
1076 appendTime(next_check);
1077
1078 storeAppendPrintf(e, "peer digest state:\n");
1079 storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",
62e76326 1080 f2s(needed), f2s(usable), f2s(requested));
e13ee7ad 1081 storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",
62e76326 1082 (int) pd->times.retry_delay);
e13ee7ad 1083 storeAppendPrintf(e, "\tlast request response time: %d secs\n",
62e76326 1084 (int) pd->times.req_delay);
e13ee7ad 1085 storeAppendPrintf(e, "\tlast request result: %s\n",
62e76326 1086 pd->req_result ? pd->req_result : "(none)");
e13ee7ad 1087
1088 storeAppendPrintf(e, "\npeer digest traffic:\n");
1089 storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",
62e76326 1090 pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
e13ee7ad 1091 storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n",
62e76326 1092 pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
e13ee7ad 1093
1094 storeAppendPrintf(e, "\npeer digest structure:\n");
62e76326 1095
e13ee7ad 1096 if (pd->cd)
62e76326 1097 cacheDigestReport(pd->cd, host, e);
e13ee7ad 1098 else
62e76326 1099 storeAppendPrintf(e, "\tno in-memory copy\n");
e13ee7ad 1100}
1101
7f6eb0fe 1102#endif