]> git.ipfire.org Git - thirdparty/squid.git/blame - src/peer_digest.cc
add range header, Range objects represent [start,end)
[thirdparty/squid.git] / src / peer_digest.cc
CommitLineData
9b7de833 1
2/*
f95db407 3 * $Id: peer_digest.cc,v 1.93 2002/10/18 22:46:46 hno Exp $
9b7de833 4 *
5 * DEBUG: section 72 Peer Digest Routines
6 * AUTHOR: Alex Rousskov
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
e25c139f 9 * ----------------------------------------------------------
9b7de833 10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
9b7de833 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
cbdec147 32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
e25c139f 33 *
9b7de833 34 */
35
36#include "squid.h"
e6ccf245 37#include "Store.h"
9b7de833 38
6cfa8966 39#if USE_CACHE_DIGESTS
7a2e5bb5 40
598465f1 41#include "StoreClient.h"
42
9b7de833 43/* local types */
44
45/* local prototypes */
e13ee7ad 46static time_t peerDigestIncDelay(const PeerDigest * pd);
47static time_t peerDigestNewDelay(const StoreEntry * e);
48static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
eb16313f 49static void peerDigestClean(PeerDigest *);
e13ee7ad 50static EVH peerDigestCheck;
51static void peerDigestRequest(PeerDigest * pd);
add2192d 52static STCB peerDigestHandleReply;
3dfaa3d2 53static int peerDigestFetchReply(void *, char *, ssize_t);
54static int peerDigestSwapInHeaders(void *, char *, ssize_t);
55static int peerDigestSwapInCBlock(void *, char *, ssize_t);
56static int peerDigestSwapInMask(void *, char *, ssize_t);
4b4cd312 57static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
551e60a9 58static void peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason);
e13ee7ad 59static void peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason);
60static void peerDigestReqFinish(DigestFetchState * fetch, char *buf, int, int, int, const char *reason, int err);
61static void peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err);
62static void peerDigestFetchFinish(DigestFetchState * fetch, int err);
63static void peerDigestFetchSetStats(DigestFetchState * fetch);
64static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
65static int peerDigestUseful(const PeerDigest * pd);
395b813e 66
9b7de833 67
68/* local constants */
9d486b43 69
9b7de833 70#define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
71
e13ee7ad 72/* min interval for requesting digests from a given peer */
73static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
74/* min interval for requesting digests (cumulative request stream) */
75static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
bd890734 76
77/* local vars */
9d486b43 78
8a6218c6 79static time_t pd_last_req_time = 0; /* last call to Check */
9b7de833 80
e13ee7ad 81/* initialize peer digest */
82static void
8a6218c6 83peerDigestInit(PeerDigest * pd, peer * p)
9b7de833 84{
e13ee7ad 85 assert(pd && p);
86
87 memset(pd, 0, sizeof(*pd));
88 pd->peer = p;
89 /* if peer disappears, we will know it's name */
90 stringInit(&pd->host, p->host);
91
92 pd->times.initialized = squid_curtime;
9b7de833 93}
94
4b4cd312 95static void
8a6218c6 96peerDigestClean(PeerDigest * pd)
9b7de833 97{
e13ee7ad 98 assert(pd);
99 if (pd->cd)
100 cacheDigestDestroy(pd->cd);
101 stringClean(&pd->host);
9b7de833 102}
103
28c60158 104CBDATA_TYPE(PeerDigest);
105
e13ee7ad 106/* allocate new peer digest, call Init, and lock everything */
107PeerDigest *
8a6218c6 108peerDigestCreate(peer * p)
e13ee7ad 109{
110 PeerDigest *pd;
8a6218c6 111 assert(p);
e13ee7ad 112
28c60158 113 CBDATA_INIT_TYPE(PeerDigest);
72711e31 114 pd = cbdataAlloc(PeerDigest);
e13ee7ad 115 peerDigestInit(pd, p);
e13ee7ad 116
fa80a8ef 117 /* XXX This does not look right, and the same thing again in the caller */
118 return cbdataReference(pd);
e13ee7ad 119}
120
121/* call Clean and free/unlock everything */
2d72d4fd 122static void
8a6218c6 123peerDigestDestroy(PeerDigest * pd)
e13ee7ad 124{
fcbbccfe 125 peer *p;
e13ee7ad 126 assert(pd);
e13ee7ad 127
fa80a8ef 128 /* inform peer (if any) that we are gone */
bac6d4bd 129 if (cbdataReferenceValidDone(pd->peer, (void **) &p))
fcbbccfe 130 peerNoteDigestGone(p);
e13ee7ad 131
132 peerDigestClean(pd);
133 cbdataFree(pd);
134}
135
136/* called by peer to indicate that somebody actually needs this digest */
137void
8a6218c6 138peerDigestNeeded(PeerDigest * pd)
e13ee7ad 139{
140 assert(pd);
141 assert(!pd->flags.needed);
142 assert(!pd->cd);
143
144 pd->flags.needed = 1;
145 pd->times.needed = squid_curtime;
8a6218c6 146 peerDigestSetCheck(pd, 0); /* check asap */
e13ee7ad 147}
148
149/* currently we do not have a reason to disable without destroying */
8a6218c6 150#if FUTURE_CODE
395b813e 151/* disables peer for good */
152static void
8a6218c6 153peerDigestDisable(PeerDigest * pd)
395b813e 154{
8a6218c6 155 debug(72, 2) ("peerDigestDisable: peer %s disabled for good\n",
e13ee7ad 156 strBuf(pd->host));
157 pd->times.disabled = squid_curtime;
8a6218c6 158 pd->times.next_check = -1; /* never */
e13ee7ad 159 pd->flags.usable = 0;
160
161 if (pd->cd) {
162 cacheDigestDestroy(pd->cd);
163 pd->cd = NULL;
164 }
165 /* we do not destroy the pd itself to preserve its "history" and stats */
395b813e 166}
e13ee7ad 167#endif
395b813e 168
e13ee7ad 169/* increment retry delay [after an unsuccessful attempt] */
395b813e 170static time_t
8a6218c6 171peerDigestIncDelay(const PeerDigest * pd)
395b813e 172{
e13ee7ad 173 assert(pd);
174 return pd->times.retry_delay > 0 ?
8a6218c6 175 2 * pd->times.retry_delay : /* exponential backoff */
176 PeerDigestReqMinGap; /* minimal delay */
395b813e 177}
178
e13ee7ad 179/* artificially increases Expires: setting to avoid race conditions
180 * returns the delay till that [increased] expiration time */
00485c29 181static time_t
e13ee7ad 182peerDigestNewDelay(const StoreEntry * e)
00485c29 183{
e13ee7ad 184 assert(e);
00485c29 185 if (e->expires > 0)
e13ee7ad 186 return e->expires + PeerDigestReqMinGap - squid_curtime;
187 return PeerDigestReqMinGap;
00485c29 188}
189
e13ee7ad 190/* registers next digest verification */
395b813e 191static void
e13ee7ad 192peerDigestSetCheck(PeerDigest * pd, time_t delay)
395b813e 193{
e13ee7ad 194 eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
195 pd->times.next_check = squid_curtime + delay;
196 debug(72, 3) ("peerDigestSetCheck: will check peer %s in %d secs\n",
84f2d773 197 strBuf(pd->host), (int) delay);
e13ee7ad 198}
199
5385c86a 200/*
201 * called when peer is about to disappear or have already disappeared
202 */
e13ee7ad 203void
8a6218c6 204peerDigestNotePeerGone(PeerDigest * pd)
205{
e13ee7ad 206 if (pd->flags.requested) {
5385c86a 207 debug(72, 2) ("peerDigest: peer %s gone, will destroy after fetch.\n",
208 strBuf(pd->host));
e13ee7ad 209 /* do nothing now, the fetching chain will notice and take action */
395b813e 210 } else {
5385c86a 211 debug(72, 2) ("peerDigest: peer %s is gone, destroying now.\n",
212 strBuf(pd->host));
e13ee7ad 213 peerDigestDestroy(pd);
395b813e 214 }
215}
216
e13ee7ad 217/* callback for eventAdd() (with peer digest locked)
218 * request new digest if our copy is too old or if we lack one;
219 * schedule next check otherwise */
9b7de833 220static void
e13ee7ad 221peerDigestCheck(void *data)
9b7de833 222{
e6ccf245 223 PeerDigest *pd = (PeerDigest *)data;
e13ee7ad 224 time_t req_time;
225
e13ee7ad 226 assert(!pd->flags.requested);
5d9bb360 227
8a6218c6 228 pd->times.next_check = 0; /* unknown */
e13ee7ad 229
fa80a8ef 230 if (!cbdataReferenceValid(pd->peer)) {
e13ee7ad 231 peerDigestNotePeerGone(pd);
232 return;
9b7de833 233 }
e13ee7ad 234 debug(72, 3) ("peerDigestCheck: peer %s:%d\n", pd->peer->host, pd->peer->http_port);
38650cc8 235 debug(72, 3) ("peerDigestCheck: time: %ld, last received: %ld (%+d)\n",
84f2d773 236 (long int) squid_curtime, (long int) pd->times.received, (int) (squid_curtime - pd->times.received));
e13ee7ad 237
238 /* decide when we should send the request:
239 * request now unless too close to other requests */
240 req_time = squid_curtime;
241
242 /* per-peer limit */
243 if (req_time - pd->times.received < PeerDigestReqMinGap) {
244 debug(72, 2) ("peerDigestCheck: %s, avoiding close peer requests (%d < %d secs).\n",
84f2d773 245 strBuf(pd->host), (int) (req_time - pd->times.received),
246 (int) PeerDigestReqMinGap);
e13ee7ad 247 req_time = pd->times.received + PeerDigestReqMinGap;
bd890734 248 }
e13ee7ad 249 /* global limit */
250 if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
251 debug(72, 2) ("peerDigestCheck: %s, avoiding close requests (%d < %d secs).\n",
84f2d773 252 strBuf(pd->host), (int) (req_time - pd_last_req_time),
253 (int) GlobDigestReqMinGap);
e13ee7ad 254 req_time = pd_last_req_time + GlobDigestReqMinGap;
9b7de833 255 }
e13ee7ad 256 if (req_time <= squid_curtime)
8a6218c6 257 peerDigestRequest(pd); /* will set pd->flags.requested */
e13ee7ad 258 else
259 peerDigestSetCheck(pd, req_time - squid_curtime);
9b7de833 260}
261
2812146b 262CBDATA_TYPE(DigestFetchState);
263
e13ee7ad 264/* ask store for a digest */
9b7de833 265static void
e13ee7ad 266peerDigestRequest(PeerDigest * pd)
9b7de833 267{
e13ee7ad 268 peer *p = pd->peer;
9b7de833 269 StoreEntry *e, *old_e;
270 char *url;
9d486b43 271 const cache_key *key;
9b7de833 272 request_t *req;
273 DigestFetchState *fetch = NULL;
598465f1 274 StoreIOBuffer tempBuffer = EMPTYIOBUFFER;
e13ee7ad 275
276 pd->req_result = NULL;
277 pd->flags.requested = 1;
278
9b7de833 279 /* compute future request components */
7e3ce7b9 280 if (p->digest_url)
281 url = xstrdup(p->digest_url);
282 else
283 url = internalRemoteUri(p->host, p->http_port,
284 "/squid-internal-periodic/", StoreDigestFileName);
285
50a97cee 286 req = urlParse(METHOD_GET, url);
e13ee7ad 287 assert(req);
f66a9ef4 288 key = storeKeyPublicByRequest(req);
289 debug(72, 2) ("peerDigestRequest: %s key: %s\n", url, storeKeyText(key));
e13ee7ad 290
9b7de833 291 /* add custom headers */
2246b732 292 assert(!req->header.len);
4820f62b 293 httpHeaderPutStr(&req->header, HDR_ACCEPT, StoreDigestMimeStr);
294 httpHeaderPutStr(&req->header, HDR_ACCEPT, "text/html");
9bc73deb 295 if (p->login)
296 xstrncpy(req->login, p->login, MAX_LOGIN_SZ);
9b7de833 297 /* create fetch state structure */
2812146b 298 CBDATA_INIT_TYPE(DigestFetchState);
72711e31 299 fetch = cbdataAlloc(DigestFetchState);
903c39e4 300 fetch->request = requestLink(req);
fa80a8ef 301 fetch->pd = cbdataReference(pd);
e13ee7ad 302 fetch->offset = 0;
add2192d 303 fetch->state = DIGEST_READ_REPLY;
e13ee7ad 304
305 /* update timestamps */
9b7de833 306 fetch->start_time = squid_curtime;
e13ee7ad 307 pd->times.requested = squid_curtime;
308 pd_last_req_time = squid_curtime;
309
92695e5e 310 req->flags.cachable = 1;
9b7de833 311 /* the rest is based on clientProcessExpired() */
92695e5e 312 req->flags.refresh = 1;
9d486b43 313 old_e = fetch->old_entry = storeGet(key);
9b7de833 314 if (old_e) {
4b4cd312 315 debug(72, 5) ("peerDigestRequest: found old entry\n");
9b7de833 316 storeLockObject(old_e);
317 storeCreateMemObject(old_e, url, url);
06d2839d 318 fetch->old_sc = storeClientListAdd(old_e, fetch);
9b7de833 319 }
320 e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
e13ee7ad 321 assert(EBIT_TEST(e->flags, KEY_PRIVATE));
06d2839d 322 fetch->sc = storeClientListAdd(e, fetch);
9b7de833 323 /* set lastmod to trigger IMS request if possible */
324 if (old_e)
325 e->lastmod = old_e->lastmod;
e13ee7ad 326
9b7de833 327 /* push towards peer cache */
e13ee7ad 328 debug(72, 3) ("peerDigestRequest: forwarding to fwdStart...\n");
7e3ce7b9 329 fwdStart(-1, e, req);
598465f1 330 tempBuffer.offset = 0;
f95db407 331 tempBuffer.length = SM_PAGE_SIZE;
598465f1 332 tempBuffer.data = fetch->buf;
333 storeClientCopy(fetch->sc, e, tempBuffer,
add2192d 334 peerDigestHandleReply, fetch);
9b7de833 335}
336
add2192d 337
338/* Handle the data copying .. */
339
340/*
341 * This routine handles the copy data and then redirects the
342 * copy to a bunch of subfunctions depending upon the copy state.
343 * It also tracks the buffer offset and "seen", since I'm actually
344 * not interested in rewriting everything to suit my little idea.
345 */
9b7de833 346static void
598465f1 347peerDigestHandleReply(void *data, StoreIOBuffer recievedData)
add2192d 348{
e6ccf245 349 DigestFetchState *fetch = (DigestFetchState *)data;
add2192d 350 PeerDigest *pd = fetch->pd;
351 int retsize = -1;
352 digest_read_state_t prevstate;
353 int newsize;
354
598465f1 355 assert(pd && recievedData.data);
356 /* The existing code assumes that the recieved pointer is
357 * where we asked the data to be put
358 */
359 assert(fetch->buf + fetch->bufofs == recievedData.data);
add2192d 360
361 /* Update the buffer size */
598465f1 362 fetch->bufofs += recievedData.length;
add2192d 363
364 assert(fetch->bufofs <= SM_PAGE_SIZE);
365
366 /* If we've fetched enough, return */
367 if (peerDigestFetchedEnough(fetch, fetch->buf, fetch->bufofs, "peerDigestHandleReply"))
368 return;
369
370 /* Call the right function based on the state */
371 /* (Those functions will update the state if needed) */
fa80a8ef 372
e2848e94 373 /* Give us a temporary reference. Some of the calls we make may
374 * try to destroy the fetch structure, and we like to know if they
375 * do
bac6d4bd 376 */
e2848e94 377 fetch = cbdataReference(fetch);
add2192d 378
379 /* Repeat this loop until we're out of data OR the state changes */
380 /* (So keep going if the state has changed and we still have data */
381 do {
fa80a8ef 382 prevstate = fetch->state;
383 switch (fetch->state) {
384 case DIGEST_READ_REPLY:
e6ccf245 385 retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs);
fa80a8ef 386 break;
387 case DIGEST_READ_HEADERS:
e6ccf245 388 retsize = peerDigestSwapInHeaders(fetch, fetch->buf, fetch->bufofs);
fa80a8ef 389 break;
390 case DIGEST_READ_CBLOCK:
e6ccf245 391 retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs);
fa80a8ef 392 break;
393 case DIGEST_READ_MASK:
e6ccf245 394 retsize = peerDigestSwapInMask(fetch, fetch->buf, fetch->bufofs);
fa80a8ef 395 break;
396 case DIGEST_READ_NONE:
397 break;
398 case DIGEST_READ_DONE:
399 goto finish;
400 break;
401 default:
402 fatal("Bad digest transfer mode!\n");
403 }
404
405 if (retsize < 0)
406 goto finish;
407 /*
408 * The returned size indicates how much of the buffer was read -
409 * so move the remainder of the buffer to the beginning
410 * and update the bufofs / bufsize
411 */
412 newsize = fetch->bufofs - retsize;
413 xmemmove(fetch->buf, fetch->buf + retsize, fetch->bufofs - newsize);
414 fetch->bufofs = newsize;
add2192d 415
e2848e94 416 } while (cbdataReferenceValid(fetch) && prevstate != fetch->state && fetch->bufofs > 0);
add2192d 417
418 /* Update the copy offset */
598465f1 419 fetch->offset += recievedData.length;
add2192d 420
421 /* Schedule another copy */
fa80a8ef 422 if (cbdataReferenceValid(fetch)) {
598465f1 423 StoreIOBuffer tempBuffer = EMPTYIOBUFFER;
424 tempBuffer.offset = fetch->offset;
425 tempBuffer.length = SM_PAGE_SIZE - fetch->bufofs;
426 tempBuffer.data = fetch->buf + fetch->bufofs;
427 storeClientCopy(fetch->sc, fetch->entry, tempBuffer,
428 peerDigestHandleReply, fetch);
add2192d 429 }
fa80a8ef 430 finish:
e2848e94 431 /* Get rid of our reference, we've finished with it for now */
432 cbdataReferenceDone(fetch);
add2192d 433}
434
435
436
437/* wait for full http headers to be received then parse them */
438/*
439 * This routine handles parsing the reply line.
440 * If the reply line indicates an OK, the same data is thrown
441 * to SwapInHeaders(). If the reply line is a NOT_MODIFIED,
442 * we simply stop parsing.
443 */
444static int
9b7de833 445peerDigestFetchReply(void *data, char *buf, ssize_t size)
446{
e6ccf245 447 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 448 PeerDigest *pd = fetch->pd;
9bc73deb 449 size_t hdr_size;
e13ee7ad 450 assert(pd && buf);
9b7de833 451 assert(!fetch->offset);
e13ee7ad 452
add2192d 453 assert(fetch->state == DIGEST_READ_REPLY);
9b7de833 454 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
add2192d 455 return -1;
e13ee7ad 456
9bc73deb 457 if ((hdr_size = headersEnd(buf, size))) {
9b7de833 458 http_status status;
459 HttpReply *reply = fetch->entry->mem_obj->reply;
460 assert(reply);
9bc73deb 461 httpReplyParse(reply, buf, hdr_size);
9b7de833 462 status = reply->sline.status;
38650cc8 463 debug(72, 3) ("peerDigestFetchReply: %s status: %d, expires: %ld (%+d)\n",
e13ee7ad 464 strBuf(pd->host), status,
84f2d773 465 (long int) reply->expires, (int) (reply->expires - squid_curtime));
e13ee7ad 466
9b7de833 467 /* this "if" is based on clientHandleIMSReply() */
468 if (status == HTTP_NOT_MODIFIED) {
469 request_t *r = NULL;
470 /* our old entry is fine */
471 assert(fetch->old_entry);
472 if (!fetch->old_entry->mem_obj->request)
473 fetch->old_entry->mem_obj->request = r =
2920225f 474 requestLink(fetch->entry->mem_obj->request);
475 assert(fetch->old_entry->mem_obj->request);
9b7de833 476 httpReplyUpdateOnNotModified(fetch->old_entry->mem_obj->reply, reply);
477 storeTimestampsSet(fetch->old_entry);
478 /* get rid of 304 reply */
06d2839d 479 storeUnregister(fetch->sc, fetch->entry, fetch);
9b7de833 480 storeUnlockObject(fetch->entry);
481 fetch->entry = fetch->old_entry;
482 fetch->old_entry = NULL;
2920225f 483 /* preserve request -- we need its size to update counters */
484 /* requestUnlink(r); */
485 /* fetch->entry->mem_obj->request = NULL; */
4b4cd312 486 } else if (status == HTTP_OK) {
9b7de833 487 /* get rid of old entry if any */
488 if (fetch->old_entry) {
e13ee7ad 489 debug(72, 3) ("peerDigestFetchReply: got new digest, releasing old one\n");
06d2839d 490 storeUnregister(fetch->old_sc, fetch->old_entry, fetch);
9b7de833 491 storeReleaseRequest(fetch->old_entry);
492 storeUnlockObject(fetch->old_entry);
493 fetch->old_entry = NULL;
494 }
495 } else {
496 /* some kind of a bug */
e13ee7ad 497 peerDigestFetchAbort(fetch, buf, httpStatusLineReason(&reply->sline));
fa80a8ef 498 return -1; /* XXX -1 will abort stuff in ReadReply! */
9b7de833 499 }
500 /* must have a ready-to-use store entry if we got here */
e13ee7ad 501 /* can we stay with the old in-memory digest? */
add2192d 502 if (status == HTTP_NOT_MODIFIED && fetch->pd->cd) {
551e60a9 503 peerDigestFetchStop(fetch, buf, "Not modified");
add2192d 504 fetch->state = DIGEST_READ_DONE;
505 } else {
506 fetch->state = DIGEST_READ_HEADERS;
fa80a8ef 507 }
9b7de833 508 } else {
509 /* need more data, do we have space? */
510 if (size >= SM_PAGE_SIZE)
e13ee7ad 511 peerDigestFetchAbort(fetch, buf, "reply header too big");
9b7de833 512 }
add2192d 513
514 /* We don't want to actually ack that we've handled anything,
515 * otherwise SwapInHeaders() won't get the reply line .. */
516 return 0;
9b7de833 517}
518
519/* fetch headers from disk, pass on to SwapInCBlock */
add2192d 520static int
9b7de833 521peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
522{
e6ccf245 523 DigestFetchState *fetch = (DigestFetchState *)data;
9b7de833 524 size_t hdr_size;
e13ee7ad 525
add2192d 526 assert(fetch->state == DIGEST_READ_HEADERS);
9b7de833 527 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
add2192d 528 return -1;
e13ee7ad 529
9d486b43 530 assert(!fetch->offset);
9b7de833 531 if ((hdr_size = headersEnd(buf, size))) {
532 assert(fetch->entry->mem_obj->reply);
533 if (!fetch->entry->mem_obj->reply->sline.status)
9bc73deb 534 httpReplyParse(fetch->entry->mem_obj->reply, buf, hdr_size);
dba2bbcd 535 if (fetch->entry->mem_obj->reply->sline.status != HTTP_OK) {
536 debug(72, 1) ("peerDigestSwapInHeaders: %s status %d got cached!\n",
e13ee7ad 537 strBuf(fetch->pd->host), fetch->entry->mem_obj->reply->sline.status);
538 peerDigestFetchAbort(fetch, buf, "internal status error");
add2192d 539 return -1;
dba2bbcd 540 }
add2192d 541 fetch->state = DIGEST_READ_CBLOCK;
fa80a8ef 542 return hdr_size; /* Say how much data we read */
9b7de833 543 } else {
544 /* need more data, do we have space? */
add2192d 545 if (size >= SM_PAGE_SIZE) {
e13ee7ad 546 peerDigestFetchAbort(fetch, buf, "stored header too big");
add2192d 547 return -1;
548 } else {
fa80a8ef 549 return 0; /* We need to read more to parse .. */
550 }
9b7de833 551 }
add2192d 552 fatal("peerDigestSwapInHeaders() - shouldn't get here!\n");
9b7de833 553}
554
add2192d 555static int
9b7de833 556peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
557{
e6ccf245 558 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 559
add2192d 560 assert(fetch->state == DIGEST_READ_CBLOCK);
9b7de833 561 if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
add2192d 562 return -1;
e13ee7ad 563
e6ccf245 564 if (size >= (ssize_t)StoreDigestCBlockSize) {
e13ee7ad 565 PeerDigest *pd = fetch->pd;
9d486b43 566 HttpReply *rep = fetch->entry->mem_obj->reply;
9d486b43 567
e13ee7ad 568 assert(pd && rep);
569 if (peerDigestSetCBlock(pd, buf)) {
570 /* XXX: soon we will have variable header size */
e13ee7ad 571 /* switch to CD buffer and fetch digest guts */
da407def 572 buf = NULL;
e13ee7ad 573 assert(pd->cd->mask);
fa80a8ef 574 fetch->state = DIGEST_READ_MASK;
575 return StoreDigestCBlockSize;
9b7de833 576 } else {
e13ee7ad 577 peerDigestFetchAbort(fetch, buf, "invalid digest cblock");
fa80a8ef 578 return -1;
9b7de833 579 }
580 } else {
581 /* need more data, do we have space? */
add2192d 582 if (size >= SM_PAGE_SIZE) {
e13ee7ad 583 peerDigestFetchAbort(fetch, buf, "digest cblock too big");
fa80a8ef 584 return -1;
585 } else {
586 return 0; /* We need more data */
587 }
9b7de833 588 }
add2192d 589 fatal("peerDigestSwapInCBlock(): shouldn't get here!\n");
9b7de833 590}
591
add2192d 592static int
9b7de833 593peerDigestSwapInMask(void *data, char *buf, ssize_t size)
594{
e6ccf245 595 DigestFetchState *fetch = (DigestFetchState *)data;
e13ee7ad 596 PeerDigest *pd;
597
e13ee7ad 598 pd = fetch->pd;
599 assert(pd->cd && pd->cd->mask);
9d486b43 600
add2192d 601 /*
602 * NOTENOTENOTENOTENOTE: buf doesn't point to pd->cd->mask anymore!
603 * we need to do the copy ourselves!
604 */
605 xmemcpy(pd->cd->mask + fetch->mask_offset, buf, size);
606
607 /* NOTE! buf points to the middle of pd->cd->mask! */
608 if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask"))
609 return -1;
610
da407def 611 fetch->mask_offset += size;
e6ccf245 612 if (fetch->mask_offset >= (off_t)pd->cd->mask_size) {
68814ffd 613 debug(72, 2) ("peerDigestSwapInMask: Done! Got %d, expected %d\n",
e13ee7ad 614 fetch->mask_offset, pd->cd->mask_size);
e6ccf245 615 assert(fetch->mask_offset == (off_t)pd->cd->mask_size);
e13ee7ad 616 assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask"));
fa80a8ef 617 return -1; /* XXX! */
e13ee7ad 618 } else {
fa80a8ef 619 /* We always read everything, so return so */
620 return size;
9b7de833 621 }
add2192d 622 fatal("peerDigestSwapInMask(): shouldn't get here!\n");
9b7de833 623}
624
625static int
4b4cd312 626peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name)
9b7de833 627{
e13ee7ad 628 PeerDigest *pd = NULL;
8a6218c6 629 const char *host = "<unknown>"; /* peer host */
630 const char *reason = NULL; /* reason for completion */
631 const char *no_bug = NULL; /* successful completion if set */
e2848e94 632 const int pdcb_valid = cbdataReferenceValid(fetch->pd);
633 const int pcb_valid = cbdataReferenceValid(fetch->pd->peer);
e13ee7ad 634
635 /* test possible exiting conditions (the same for most steps!)
636 * cases marked with '?!' should not happen */
637
638 if (!reason) {
e2848e94 639 if (!(pd = fetch->pd))
e13ee7ad 640 reason = "peer digest disappeared?!";
bac6d4bd 641#if DONT /* WHY NOT? /HNO */
fa80a8ef 642 else if (!cbdataReferenceValid(pd))
e13ee7ad 643 reason = "invalidated peer digest?!";
5385c86a 644#endif
e13ee7ad 645 else
646 host = strBuf(pd->host);
647 }
e13ee7ad 648 debug(72, 6) ("%s: peer %s, offset: %d size: %d.\n",
bac6d4bd 649 step_name, host,
e2848e94 650 fetch->offset, size);
e13ee7ad 651
652 /* continue checking (with pd and host known and valid) */
653 if (!reason) {
fa80a8ef 654 if (!cbdataReferenceValid(pd->peer))
e13ee7ad 655 reason = "peer disappeared";
656 else if (size < 0)
657 reason = "swap failure";
658 else if (!fetch->entry)
659 reason = "swap aborted?!";
b7fe0ab0 660 else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))
e13ee7ad 661 reason = "swap aborted";
662 }
e13ee7ad 663 /* continue checking (maybe-successful eof case) */
664 if (!reason && !size) {
665 if (!pd->cd)
666 reason = "null digest?!";
e6ccf245 667 else if (fetch->mask_offset != (off_t)pd->cd->mask_size)
e13ee7ad 668 reason = "premature end of digest?!";
669 else if (!peerDigestUseful(pd))
670 reason = "useless digest";
671 else
672 reason = no_bug = "success";
673 }
e13ee7ad 674 /* finish if we have a reason */
9b7de833 675 if (reason) {
e13ee7ad 676 const int level = strstr(reason, "?!") ? 1 : 3;
677 debug(72, level) ("%s: peer %s, exiting after '%s'\n",
678 step_name, host, reason);
679 peerDigestReqFinish(fetch, buf,
e2848e94 680 1, pdcb_valid, pcb_valid, reason, !no_bug);
e13ee7ad 681 } else {
682 /* paranoid check */
e2848e94 683 assert(pdcb_valid && pcb_valid);
e13ee7ad 684 }
685 return reason != NULL;
686}
687
551e60a9 688/* call this when all callback data is valid and fetch must be stopped but
689 * no error has occurred (e.g. we received 304 reply and reuse old digest) */
690static void
691peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason)
692{
693 assert(reason);
694 debug(72, 2) ("peerDigestFetchStop: peer %s, reason: %s\n",
695 strBuf(fetch->pd->host), reason);
696 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 0);
697}
698
e13ee7ad 699/* call this when all callback data is valid but something bad happened */
700static void
701peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason)
702{
551e60a9 703 assert(reason);
704 debug(72, 2) ("peerDigestFetchAbort: peer %s, reason: %s\n",
705 strBuf(fetch->pd->host), reason);
e13ee7ad 706 peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);
707}
708
709/* complete the digest transfer, update stats, unlock/release everything */
710static void
711peerDigestReqFinish(DigestFetchState * fetch, char *buf,
8a6218c6 712 int fcb_valid, int pdcb_valid, int pcb_valid,
e13ee7ad 713 const char *reason, int err)
714{
715 assert(reason);
716
717 /* must go before peerDigestPDFinish */
718 if (pdcb_valid) {
719 fetch->pd->flags.requested = 0;
720 fetch->pd->req_result = reason;
721 }
e13ee7ad 722 /* schedule next check if peer is still out there */
723 if (pcb_valid) {
724 PeerDigest *pd = fetch->pd;
725 if (err) {
726 pd->times.retry_delay = peerDigestIncDelay(pd);
727 peerDigestSetCheck(pd, pd->times.retry_delay);
9d486b43 728 } else {
e13ee7ad 729 pd->times.retry_delay = 0;
730 peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
9d486b43 731 }
9b7de833 732 }
e13ee7ad 733 /* note: order is significant */
734 if (fcb_valid)
735 peerDigestFetchSetStats(fetch);
736 if (pdcb_valid)
737 peerDigestPDFinish(fetch, pcb_valid, err);
738 if (fcb_valid)
739 peerDigestFetchFinish(fetch, err);
9b7de833 740}
741
e13ee7ad 742
743/* destroys digest if peer disappeared
744 * must be called only when fetch and pd cbdata are valid */
9b7de833 745static void
e13ee7ad 746peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err)
9b7de833 747{
e13ee7ad 748 PeerDigest *pd = fetch->pd;
749 const char *host = strBuf(pd->host);
750
751 pd->times.received = squid_curtime;
752 pd->times.req_delay = fetch->resp_time;
8a6218c6 753 kb_incr(&pd->stats.sent.kbytes, (size_t) fetch->sent.bytes);
754 kb_incr(&pd->stats.recv.kbytes, (size_t) fetch->recv.bytes);
e13ee7ad 755 pd->stats.sent.msgs += fetch->sent.msg;
756 pd->stats.recv.msgs += fetch->recv.msg;
757
758 if (err) {
759 debug(72, 1) ("%sdisabling (%s) digest from %s\n",
760 pcb_valid ? "temporary " : "",
761 pd->req_result, host);
762
763 if (pd->cd) {
764 cacheDigestDestroy(pd->cd);
765 pd->cd = NULL;
766 }
e13ee7ad 767 pd->flags.usable = 0;
768
769 if (!pcb_valid)
770 peerDigestNotePeerGone(pd);
771 } else {
772 assert(pcb_valid);
773
774 pd->flags.usable = 1;
775
776 /* XXX: ugly condition, but how? */
777 if (fetch->entry->store_status == STORE_OK)
778 debug(72, 2) ("re-used old digest from %s\n", host);
779 else
780 debug(72, 2) ("received valid digest from %s\n", host);
d1cdaa16 781 }
fa80a8ef 782 cbdataReferenceDone(fetch->pd);
e13ee7ad 783}
784
785/* free fetch state structures
786 * must be called only when fetch cbdata is valid */
787static void
788peerDigestFetchFinish(DigestFetchState * fetch, int err)
789{
790 assert(fetch->entry && fetch->request);
791
9b7de833 792 if (fetch->old_entry) {
4b4cd312 793 debug(72, 2) ("peerDigestFetchFinish: deleting old entry\n");
06d2839d 794 storeUnregister(fetch->sc, fetch->old_entry, fetch);
9b7de833 795 storeReleaseRequest(fetch->old_entry);
796 storeUnlockObject(fetch->old_entry);
797 fetch->old_entry = NULL;
798 }
1543ab6c 799 /* update global stats */
83704487 800 kb_incr(&statCounter.cd.kbytes_sent, (size_t) fetch->sent.bytes);
801 kb_incr(&statCounter.cd.kbytes_recv, (size_t) fetch->recv.bytes);
802 statCounter.cd.msgs_sent += fetch->sent.msg;
803 statCounter.cd.msgs_recv += fetch->recv.msg;
e13ee7ad 804
1543ab6c 805 /* unlock everything */
06d2839d 806 storeUnregister(fetch->sc, fetch->entry, fetch);
9b7de833 807 storeUnlockObject(fetch->entry);
903c39e4 808 requestUnlink(fetch->request);
9b7de833 809 fetch->entry = NULL;
903c39e4 810 fetch->request = NULL;
3855c318 811 assert(fetch->pd == NULL);
9b7de833 812 cbdataFree(fetch);
9b7de833 813}
814
e13ee7ad 815/* calculate fetch stats after completion */
816static void
817peerDigestFetchSetStats(DigestFetchState * fetch)
818{
819 MemObject *mem;
820 assert(fetch->entry && fetch->request);
821
822 mem = fetch->entry->mem_obj;
823 assert(mem);
824
825 /* XXX: outgoing numbers are not precise */
826 /* XXX: we must distinguish between 304 hits and misses here */
827 fetch->sent.bytes = httpRequestPrefixLen(fetch->request);
828 fetch->recv.bytes = fetch->entry->store_status == STORE_PENDING ?
829 mem->inmem_hi : mem->object_sz;
830 fetch->sent.msg = fetch->recv.msg = 1;
831 fetch->expires = fetch->entry->expires;
832 fetch->resp_time = squid_curtime - fetch->start_time;
833
834 debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secs\n",
84f2d773 835 fetch->recv.bytes, (int) fetch->resp_time);
38650cc8 836 debug(72, 3) ("peerDigestFetchFinish: expires: %ld (%+d), lmt: %ld (%+d)\n",
84f2d773 837 (long int) fetch->expires, (int) (fetch->expires - squid_curtime),
838 (long int) fetch->entry->lastmod, (int) (fetch->entry->lastmod - squid_curtime));
e13ee7ad 839}
840
841
9b7de833 842static int
8a6218c6 843peerDigestSetCBlock(PeerDigest * pd, const char *buf)
9b7de833 844{
845 StoreDigestCBlock cblock;
846 int freed_size = 0;
e13ee7ad 847 const char *host = strBuf(pd->host);
848
9b7de833 849 xmemcpy(&cblock, buf, sizeof(cblock));
850 /* network -> host conversions */
851 cblock.ver.current = ntohs(cblock.ver.current);
852 cblock.ver.required = ntohs(cblock.ver.required);
853 cblock.capacity = ntohl(cblock.capacity);
854 cblock.count = ntohl(cblock.count);
855 cblock.del_count = ntohl(cblock.del_count);
856 cblock.mask_size = ntohl(cblock.mask_size);
4b4cd312 857 debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)\n",
e13ee7ad 858 host, (int) cblock.ver.current, (int) cblock.ver.required);
4b4cd312 859 debug(72, 2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n",
9b7de833 860 cblock.mask_size, cblock.count,
861 xpercentInt(cblock.count, cblock.capacity));
6106c6fc 862 /* check version requirements (both ways) */
9b7de833 863 if (cblock.ver.required > CacheDigestVer.current) {
4b4cd312 864 debug(72, 1) ("%s digest requires version %d; have: %d\n",
e13ee7ad 865 host, cblock.ver.required, CacheDigestVer.current);
9b7de833 866 return 0;
867 }
6106c6fc 868 if (cblock.ver.current < CacheDigestVer.required) {
4b4cd312 869 debug(72, 1) ("%s digest is version %d; we require: %d\n",
e13ee7ad 870 host, cblock.ver.current, CacheDigestVer.required);
6106c6fc 871 return 0;
872 }
9b7de833 873 /* check consistency */
4b4cd312 874 if (cblock.ver.required > cblock.ver.current ||
d1cdaa16 875 cblock.mask_size <= 0 || cblock.capacity <= 0 ||
876 cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
e13ee7ad 877 debug(72, 0) ("%s digest cblock is corrupted.\n", host);
9b7de833 878 return 0;
879 }
d1cdaa16 880 /* check consistency further */
e6ccf245 881 if ((size_t)cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
4b4cd312 882 debug(72, 0) ("%s digest cblock is corrupted (mask size mismatch: %d ? %d).\n",
e13ee7ad 883 host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry));
d1cdaa16 884 return 0;
885 }
886 /* there are some things we cannot do yet */
887 if (cblock.hash_func_count != CacheDigestHashFuncCount) {
4b4cd312 888 debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.\n",
e13ee7ad 889 host, cblock.hash_func_count, CacheDigestHashFuncCount);
d1cdaa16 890 return 0;
891 }
9b7de833 892 /*
893 * no cblock bugs below this point
894 */
895 /* check size changes */
e6ccf245 896 if (pd->cd && cblock.mask_size != (ssize_t)pd->cd->mask_size) {
4b4cd312 897 debug(72, 2) ("%s digest changed size: %d -> %d\n",
8a6218c6 898 host, cblock.mask_size, pd->cd->mask_size);
e13ee7ad 899 freed_size = pd->cd->mask_size;
900 cacheDigestDestroy(pd->cd);
901 pd->cd = NULL;
9b7de833 902 }
e13ee7ad 903 if (!pd->cd) {
45694534 904 debug(72, 2) ("creating %s digest; size: %d (%+d) bytes\n",
e13ee7ad 905 host, cblock.mask_size, (int) (cblock.mask_size - freed_size));
906 pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);
9b7de833 907 if (cblock.mask_size >= freed_size)
83704487 908 kb_incr(&statCounter.cd.memory, cblock.mask_size - freed_size);
9b7de833 909 }
e13ee7ad 910 assert(pd->cd);
9b7de833 911 /* these assignments leave us in an inconsistent state until we finish reading the digest */
e13ee7ad 912 pd->cd->count = cblock.count;
913 pd->cd->del_count = cblock.del_count;
9b7de833 914 return 1;
915}
916
9b7de833 917static int
8a6218c6 918peerDigestUseful(const PeerDigest * pd)
9b7de833 919{
d1cdaa16 920 /* TODO: we should calculate the prob of a false hit instead of bit util */
e13ee7ad 921 const int bit_util = cacheDigestBitUtil(pd->cd);
922 if (bit_util > 65) {
4b4cd312 923 debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).\n",
e13ee7ad 924 strBuf(pd->host), bit_util);
d1cdaa16 925 return 0;
9b7de833 926 }
927 return 1;
928}
7f6eb0fe 929
e13ee7ad 930static int
931saneDiff(time_t diff)
932{
8a6218c6 933 return abs(diff) > squid_curtime / 2 ? 0 : diff;
e13ee7ad 934}
935
936void
8a6218c6 937peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e)
e13ee7ad 938{
939#define f2s(flag) (pd->flags.flag ? "yes" : "no")
38650cc8 940#define appendTime(tm) storeAppendPrintf(e, "%s\t %10ld\t %+d\t %+d\n", \
941 ""#tm, (long int)pd->times.tm, \
e13ee7ad 942 saneDiff(pd->times.tm - squid_curtime), \
943 saneDiff(pd->times.tm - pd->times.initialized))
944
945 const char *host = pd ? strBuf(pd->host) : NULL;
946 assert(pd);
947
948 storeAppendPrintf(e, "\npeer digest from %s\n", host);
949
950 cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
951
952 storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");
953 appendTime(initialized);
954 appendTime(needed);
955 appendTime(requested);
956 appendTime(received);
957 appendTime(next_check);
958
959 storeAppendPrintf(e, "peer digest state:\n");
960 storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",
961 f2s(needed), f2s(usable), f2s(requested));
962 storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",
84f2d773 963 (int) pd->times.retry_delay);
e13ee7ad 964 storeAppendPrintf(e, "\tlast request response time: %d secs\n",
84f2d773 965 (int) pd->times.req_delay);
e13ee7ad 966 storeAppendPrintf(e, "\tlast request result: %s\n",
967 pd->req_result ? pd->req_result : "(none)");
968
969 storeAppendPrintf(e, "\npeer digest traffic:\n");
970 storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",
971 pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
972 storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n",
973 pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
974
975 storeAppendPrintf(e, "\npeer digest structure:\n");
976 if (pd->cd)
977 cacheDigestReport(pd->cd, host, e);
978 else
979 storeAppendPrintf(e, "\tno in-memory copy\n");
980}
981
7f6eb0fe 982#endif