/*
- * $Id: peer_digest.cc,v 1.57 1998/11/12 06:28:18 wessels Exp $
+ * $Id: peer_digest.cc,v 1.58 1998/11/13 21:02:07 rousskov Exp $
*
* DEBUG: section 72 Peer Digest Routines
* AUTHOR: Alex Rousskov
/* local types */
/* local prototypes */
-static void peerDigestClean(peer * p);
-static time_t peerDigestNextDisDelay(const peer * p);
-static time_t peerDigestExpiresDelay(const peer * p, const StoreEntry * e);
-static void peerDigestDisable(peer * p);
-static void peerDigestDelay(peer * p, int disable, time_t delay);
-static EVH peerDigestValidate;
-static void peerDigestRequest(peer * p);
+static time_t peerDigestIncDelay(const PeerDigest * pd);
+static time_t peerDigestNewDelay(const StoreEntry * e);
+static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
+static EVH peerDigestCheck;
+static void peerDigestRequest(PeerDigest * pd);
static STCB peerDigestFetchReply;
-static void peerDigestRequest(peer * p);
static STCB peerDigestSwapInHeaders;
static STCB peerDigestSwapInCBlock;
static STCB peerDigestSwapInMask;
static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
-static void peerDigestFetchFinish(DigestFetchState * fetch, char *buf, const char *err_msg);
-static int peerDigestSetCBlock(peer * p, const char *buf);
-static int peerDigestUseful(const peer * peer);
-#define max_delay(t1,t2) ((t1) >= (t2) ? (t1) : (t2))
+static void peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason);
+static void peerDigestReqFinish(DigestFetchState * fetch, char *buf, int, int, int, const char *reason, int err);
+static void peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err);
+static void peerDigestFetchFinish(DigestFetchState * fetch, int err);
+static void peerDigestFetchSetStats(DigestFetchState * fetch);
+static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
+static int peerDigestUseful(const PeerDigest * pd);
/* local constants */
#define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
-/* min interval for requesting digests from the same peer */
-static const time_t PeerDigestRequestMinGap = 5 * 60; /* seconds */
-/* min interval for requesting digests at start */
-static const time_t GlobalDigestRequestMinGap = 1 * 60; /* seconds */
+/* min interval for requesting digests from a given peer */
+static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
+/* min interval for requesting digests (cumulative request stream) */
+static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
/* local vars */
-static time_t global_last_req_timestamp = 0;
+static time_t pd_last_req_time = 0; /* last call to Check */
-/* always call cbdataLock() before calling this! */
-void
-peerDigestInit(void *data)
+/* initialize peer digest */
+static void
+peerDigestInit(PeerDigest *pd, peer *p)
{
- peer *p = data;
- assert(p);
- if (!cbdataValid(p)) { /* peer disappeared */
- cbdataUnlock(p);
- return;
- }
- assert(p->digest.flags.init_pending);
- assert(!p->digest.flags.inited);
- assert(!p->digest.cd);
- assert(SM_PAGE_SIZE == 4096); /* we use MEM_4K_BUF */
- if (p->options.no_digest) {
- peerDigestClean(p);
- } else {
- peerDigestValidate(p);
- }
- p->digest.flags.inited = 1;
- p->digest.flags.init_pending = 0;
+ assert(pd && p);
+
+ memset(pd, 0, sizeof(*pd));
+ pd->peer = p;
+ /* if peer disappears, we will know it's name */
+ stringInit(&pd->host, p->host);
+
+ pd->times.initialized = squid_curtime;
}
-/* no pending events or requests should exist when you call this */
static void
-peerDigestClean(peer * p)
+peerDigestClean(PeerDigest *pd)
{
- if (cbdataValid(p)) {
- assert(!p->digest.flags.requested);
- peerDigestDisable(p);
- }
- cbdataUnlock(p);
+ assert(pd);
+ if (pd->cd)
+ cacheDigestDestroy(pd->cd);
+ stringClean(&pd->host);
}
+/* allocate new peer digest, call Init, and lock everything */
+PeerDigest *
+peerDigestCreate(peer *p)
+{
+ PeerDigest *pd;
+ assert(p);
+ /* cannot check cbdataValid(p) because p may not be locked yet */
+
+ pd = memAllocate(MEM_PEER_DIGEST);
+ cbdataAdd(pd, MEM_PEER_DIGEST);
+ peerDigestInit(pd, p);
+ cbdataLock(pd->peer); /* we will use the peer */
+
+ return pd;
+}
+
+/* call Clean and free/unlock everything */
+void
+peerDigestDestroy(PeerDigest *pd)
+{
+ assert(pd);
+ assert(cbdataValid(pd));
+
+ /* inform peer (if any) that we are gone */
+ if (cbdataValid(pd->peer))
+ peerNoteDigestGone(pd->peer);
+ cbdataUnlock(pd->peer); /* must unlock, valid or not */
+ pd->peer = NULL;
+
+ peerDigestClean(pd);
+ cbdataFree(pd);
+}
+
+/* called by peer to indicate that somebody actually needs this digest */
+void
+peerDigestNeeded(PeerDigest *pd)
+{
+ assert(pd);
+ assert(!pd->flags.needed);
+ assert(!pd->cd);
+
+ pd->flags.needed = 1;
+ pd->times.needed = squid_curtime;
+ peerDigestSetCheck(pd, 0); /* check asap */
+}
+
+/* currently we do not have a reason to disable without destroying */
+#if FUTURE_CODE
/* disables peer for good */
static void
-peerDigestDisable(peer * p)
+peerDigestDisable(PeerDigest *pd)
{
- peerDigestDelay(p, 1, -1);
+ debug(72, 2) ("peerDigestDisable: peer %s disabled for good\n",
+ strBuf(pd->host));
+ pd->times.disabled = squid_curtime;
+ pd->times.next_check = -1; /* never */
+ pd->flags.usable = 0;
+
+ if (pd->cd) {
+ cacheDigestDestroy(pd->cd);
+ pd->cd = NULL;
+ }
+ /* we do not destroy the pd itself to preserve its "history" and stats */
}
+#endif
-/* next delay for a disabled entry */
+/* increment retry delay [after an unsuccessful attempt] */
static time_t
-peerDigestNextDisDelay(const peer * p)
+peerDigestIncDelay(const PeerDigest *pd)
{
- assert(p);
- return p->digest.last_dis_delay ?
- 2 * p->digest.last_dis_delay : /* exponential backoff */
- PeerDigestRequestMinGap; /* minimal delay */
+ assert(pd);
+ return pd->times.retry_delay > 0 ?
+ 2*pd->times.retry_delay : /* exponential backoff */
+ PeerDigestReqMinGap; /* minimal delay */
}
-/* artificially increases expires to avoid race conditions */
+/* artificially increases Expires: setting to avoid race conditions
+ * returns the delay till that [increased] expiration time */
static time_t
-peerDigestExpiresDelay(const peer * p, const StoreEntry * e)
+peerDigestNewDelay(const StoreEntry * e)
{
- assert(p);
- if (!e)
- return 0;
+ assert(e);
if (e->expires > 0)
- return e->expires + PeerDigestRequestMinGap - squid_curtime;
- return PeerDigestRequestMinGap;
+ return e->expires + PeerDigestReqMinGap - squid_curtime;
+ return PeerDigestReqMinGap;
}
-
-/* delays/disables digest for a psecified delay (disables forever if negative delay) */
+/* registers next digest verification */
static void
-peerDigestDelay(peer * p, int disable, time_t delay)
+peerDigestSetCheck(PeerDigest * pd, time_t delay)
{
- assert(p);
- if (disable) {
- p->digest.flags.disabled = 1;
- p->digest.last_dis_delay = delay;
- }
- if (delay >= 0) {
- assert(delay || !disable);
- debug(72, 2) ("peerDigestDelay: %s: peer %s for %d secs till %s\n",
- disable ? "disabling" : "delaying",
- p->host ? p->host : "<null>",
- delay, mkrfc1123(squid_curtime + delay));
- eventAdd("peerDigestValidate", peerDigestValidate, p, (double) delay, 1);
+ cbdataLock(pd);
+ eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
+ pd->times.next_check = squid_curtime + delay;
+ debug(72, 3) ("peerDigestSetCheck: will check peer %s in %d secs\n",
+ strBuf(pd->host), delay);
+}
+
+/* called only when cbdataValid(pd) and
+ * peer is about to disappear or have already disappeared */
+void
+peerDigestNotePeerGone(PeerDigest *pd) {
+ assert(cbdataValid(pd));
+ if (pd->flags.requested) {
+ debug(72, 2) ("peerDigest: peer %s is gone, will destroy after fetch.\n", strBuf(pd->host));
+ /* do nothing now, the fetching chain will notice and take action */
} else {
- assert(disable);
- debug(72, 2) ("peerDigestDisable: disabling peer %s for good\n",
- p->host ? p->host : "<null>");
- /* just in case, will not need it anymore */
- p->digest.flags.usable = 0;
+ debug(72, 2) ("peerDigest: peer %s is gone, destroying now.\n", strBuf(pd->host));
+ peerDigestDestroy(pd);
}
}
-/* request new digest if our copy is too old; schedule next validation */
+/* callback for eventAdd() (with peer digest locked)
+ * request new digest if our copy is too old or if we lack one;
+ * schedule next check otherwise */
static void
-peerDigestValidate(void *data)
+peerDigestCheck(void *data)
{
- peer *p = data;
- StoreEntry *e = NULL;
- int do_request;
- time_t req_time = squid_curtime;
- assert(p);
- if (!cbdataValid(p)) {
- peerDigestClean(p);
+ PeerDigest *pd = data;
+ time_t req_time;
+
+ assert(pd);
+
+ if (!cbdataValid(pd)) {
+ cbdataUnlock(pd);
return;
}
- debug(72, 3) ("peerDigestValidate: digest %s\n", p->host);
- debug(72, 3) ("current GMT time: %s\n", mkrfc1123(squid_curtime));
- assert(!p->digest.flags.requested);
- debug(72, 3) ("peerDigestValidate: %s was %s disabled\n",
- p->host, p->digest.last_dis_delay ? "" : "not");
- if (1 /* p->digest.cd */ ) {
- const cache_key *key;
- const char *u = internalRemoteUri(p->host, p->http_port, "/squid-internal-periodic/", StoreDigestUrlPath);
- key = storeKeyPublic(u, METHOD_GET);
- e = storeGet(key);
- debug(72, 3) ("peerDigestValidate: %s store entry, key: %s, exp: %s\n",
- e ? "has" : "no", storeKeyText(key), mkrfc1123(e ? e->expires : 0));
- }
- /* currently we rely on entry->expire information */
- {
- const int loaded = p->digest.cd != NULL;
- const time_t exp_delay = loaded ? peerDigestExpiresDelay(p, e) : 0;
- do_request = exp_delay <= 0;
- req_time = squid_curtime + exp_delay;
- if (req_time < squid_curtime)
- req_time = squid_curtime;
- }
- /* do not request too often from one peer */
- if (req_time - p->digest.last_req_timestamp < PeerDigestRequestMinGap) {
- if (do_request) {
- debug(72, 2) ("peerDigestValidate: %s, avoiding too close peer requests (%d secs).\n",
- p->host, req_time - p->digest.last_req_timestamp);
- do_request = 0;
- }
- req_time = p->digest.last_req_timestamp + PeerDigestRequestMinGap;
+ cbdataUnlock(pd); /* non-blocking event is over */
+
+ assert(!pd->flags.requested);
+ pd->times.next_check = 0; /* unknown */
+
+ if (!cbdataValid(pd->peer)) {
+ peerDigestNotePeerGone(pd);
+ return;
}
- /* at start, do not request too often from all peers */
- if (!p->digest.flags.inited &&
- req_time - global_last_req_timestamp < GlobalDigestRequestMinGap) {
- if (do_request) {
- debug(72, 2) ("peerDigestValidate: %s, avoiding too close requests (%d secs).\n",
- p->host, req_time - global_last_req_timestamp);
- do_request = 0;
- }
- req_time = global_last_req_timestamp + GlobalDigestRequestMinGap;
- /* otherwise we have all but one peer returning at the same moment @?@ */
- debug(72, 5) ("peerDigestValidate: inc req_time (%+d) in anticipation of more reqs\n",
- (int) (req_time - global_last_req_timestamp));
- global_last_req_timestamp = req_time;
+
+ debug(72, 3) ("peerDigestCheck: peer %s:%d\n", pd->peer->host, pd->peer->http_port);
+ debug(72, 3) ("peerDigestCheck: time: %d, last received: %d (%+d)\n",
+ squid_curtime, pd->times.received, (squid_curtime-pd->times.received));
+
+ /* decide when we should send the request:
+ * request now unless too close to other requests */
+ req_time = squid_curtime;
+
+ /* per-peer limit */
+ if (req_time - pd->times.received < PeerDigestReqMinGap) {
+ debug(72, 2) ("peerDigestCheck: %s, avoiding close peer requests (%d < %d secs).\n",
+ strBuf(pd->host), req_time - pd->times.received,
+ PeerDigestReqMinGap);
+ req_time = pd->times.received + PeerDigestReqMinGap;
}
- /* start request if needed */
- if (do_request) {
- static int nest_level = 0;
- nest_level++;
- assert(nest_level == 1);
- debug(72, 2) ("peerDigestValidate: %s requesting; old entry expires: %s\n",
- p->host, e ? mkrfc1123(e->expires) : "no entry");
- /* will eventually disable digests or call peerDigest Delay */
- peerDigestRequest(p);
- nest_level--;
- } else {
- /* schedule next re-validation */
- assert(req_time > squid_curtime);
- peerDigestDelay(p, !p->digest.cd, req_time - squid_curtime);
+ /* global limit */
+ if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
+ debug(72, 2) ("peerDigestCheck: %s, avoiding close requests (%d < %d secs).\n",
+ strBuf(pd->host), req_time - pd_last_req_time,
+ GlobDigestReqMinGap);
+ req_time = pd_last_req_time + GlobDigestReqMinGap;
}
+
+ if (req_time <= squid_curtime)
+ peerDigestRequest(pd); /* will set pd->flags.requested */
+ else
+ peerDigestSetCheck(pd, req_time - squid_curtime);
}
-/* ask peer cache for a fresh digest */
+/* ask store for a digest */
static void
-peerDigestRequest(peer * p)
+peerDigestRequest(PeerDigest * pd)
{
+ peer *p = pd->peer;
StoreEntry *e, *old_e;
char *url;
const cache_key *key;
request_t *req;
DigestFetchState *fetch = NULL;
- assert(p);
- p->digest.flags.requested = 1;
+
+ pd->req_result = NULL;
+ pd->flags.requested = 1;
+
/* compute future request components */
- url = internalRemoteUri(p->host, p->http_port, "/squid-internal-periodic/", StoreDigestUrlPath);
+ url = internalRemoteUri(p->host, p->http_port, "/squid-internal-periodic/", StoreDigestFileName);
key = storeKeyPublic(url, METHOD_GET);
debug(72, 2) ("peerDigestRequest: %s key: %s\n", url, storeKeyText(key));
req = urlParse(METHOD_GET, url);
- if (NULL == req) {
- debug(72, 1) ("peerDigestRequest: Bad URI: %s\n", url);
- return; /* @?@ */
- }
+ assert(req);
+
/* add custom headers */
assert(!req->header.len);
httpHeaderPutStr(&req->header, HDR_ACCEPT, StoreDigestMimeStr);
httpHeaderPutStr(&req->header, HDR_ACCEPT, "text/html");
+
/* create fetch state structure */
fetch = memAllocate(MEM_DIGEST_FETCH_STATE);
cbdataAdd(fetch, MEM_DIGEST_FETCH_STATE);
- cbdataLock(fetch);
fetch->request = requestLink(req);
- fetch->peer = p;
+ fetch->pd = pd;
+ fetch->offset = 0;
+
+ /* update timestamps */
fetch->start_time = squid_curtime;
- p->digest.last_req_timestamp = squid_curtime;
- global_last_req_timestamp = squid_curtime;
+ pd->times.requested = squid_curtime;
+ pd_last_req_time = squid_curtime;
+
req->flags.cachable = 1;
/* the rest is based on clientProcessExpired() */
req->flags.refresh = 1;
storeClientListAdd(old_e, fetch);
}
e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
- debug(72, 5) ("peerDigestRequest: new entry is private: %d\n",
- (int) EBIT_TEST(e->flags, KEY_PRIVATE));
+ assert(EBIT_TEST(e->flags, KEY_PRIVATE));
storeClientListAdd(e, fetch);
/* set lastmod to trigger IMS request if possible */
if (old_e)
e->lastmod = old_e->lastmod;
- fetch->offset = 0;
- debug(72, 3) ("peerDigestRequest: forwarding to fwdStart...\n");
+
/* push towards peer cache */
+ debug(72, 3) ("peerDigestRequest: forwarding to fwdStart...\n");
fwdStart(-1, e, req, no_addr);
- storeClientCopy(e, 0, 0, SM_PAGE_SIZE, memAllocate(MEM_4K_BUF),
+ cbdataLock(fetch);
+ cbdataLock(fetch->pd);
+ storeClientCopy(e, 0, 0, 4096, memAllocate(MEM_4K_BUF),
peerDigestFetchReply, fetch);
}
-/* waits for full http headers to be received and parses them */
+/* wait for full http headers to be received then parse them */
static void
peerDigestFetchReply(void *data, char *buf, ssize_t size)
{
DigestFetchState *fetch = data;
- peer *peer = fetch->peer;
- assert(peer && buf);
+ PeerDigest *pd = fetch->pd;
+ assert(pd && buf);
assert(!fetch->offset);
+
if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
return;
+
if (headersEnd(buf, size)) {
http_status status;
HttpReply *reply = fetch->entry->mem_obj->reply;
assert(reply);
httpReplyParse(reply, buf);
status = reply->sline.status;
- debug(72, 3) ("peerDigestFetchReply: %s status: %d, expires: %s\n",
- peer->host, status, mkrfc1123(reply->expires));
+ debug(72, 3) ("peerDigestFetchReply: %s status: %d, expires: %d (%+d)\n",
+ strBuf(pd->host), status,
+ reply->expires, reply->expires-squid_curtime);
+
/* this "if" is based on clientHandleIMSReply() */
if (status == HTTP_NOT_MODIFIED) {
request_t *r = NULL;
storeTimestampsSet(fetch->old_entry);
/* get rid of 304 reply */
storeUnregister(fetch->entry, fetch);
- /* paranoid assert: storeUnregister should not call us recursively */
- assert(fetch->entry);
storeUnlockObject(fetch->entry);
fetch->entry = fetch->old_entry;
fetch->old_entry = NULL;
/* preserve request -- we need its size to update counters */
/* requestUnlink(r); */
/* fetch->entry->mem_obj->request = NULL; */
- assert(fetch->entry->mem_obj);
} else if (status == HTTP_OK) {
/* get rid of old entry if any */
if (fetch->old_entry) {
- debug(72, 3) ("peerDigestFetchReply: got new digest, requesting release of old digest\n");
+ debug(72, 3) ("peerDigestFetchReply: got new digest, releasing old one\n");
storeUnregister(fetch->old_entry, fetch);
storeReleaseRequest(fetch->old_entry);
storeUnlockObject(fetch->old_entry);
}
} else {
/* some kind of a bug */
- peerDigestFetchFinish(fetch, buf, httpStatusLineReason(&reply->sline));
+ peerDigestFetchAbort(fetch, buf, httpStatusLineReason(&reply->sline));
return;
}
/* must have a ready-to-use store entry if we got here */
- /* can we stay with the old digest? */
- if (status == HTTP_NOT_MODIFIED && fetch->peer->digest.cd)
- peerDigestFetchFinish(fetch, buf, NULL);
+ /* can we stay with the old in-memory digest? */
+ if (status == HTTP_NOT_MODIFIED && fetch->pd->cd)
+ peerDigestFetchAbort(fetch, buf, NULL);
else
storeClientCopy(fetch->entry, /* have to swap in */
0, 0, SM_PAGE_SIZE, buf, peerDigestSwapInHeaders, fetch);
- return;
} else {
/* need more data, do we have space? */
if (size >= SM_PAGE_SIZE)
- peerDigestFetchFinish(fetch, buf, "too big header");
+ peerDigestFetchAbort(fetch, buf, "reply header too big");
else
storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf,
peerDigestFetchReply, fetch);
peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
{
DigestFetchState *fetch = data;
- peer *peer;
size_t hdr_size;
+
if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
return;
- peer = fetch->peer;
- assert(peer && buf);
+
assert(!fetch->offset);
if ((hdr_size = headersEnd(buf, size))) {
assert(fetch->entry->mem_obj->reply);
httpReplyParse(fetch->entry->mem_obj->reply, buf);
if (fetch->entry->mem_obj->reply->sline.status != HTTP_OK) {
debug(72, 1) ("peerDigestSwapInHeaders: %s status %d got cached!\n",
- peer->host, fetch->entry->mem_obj->reply->sline.status);
- peerDigestFetchFinish(fetch, buf, "internal status error");
+ strBuf(fetch->pd->host), fetch->entry->mem_obj->reply->sline.status);
+ peerDigestFetchAbort(fetch, buf, "internal status error");
return;
}
fetch->offset += hdr_size;
} else {
/* need more data, do we have space? */
if (size >= SM_PAGE_SIZE)
- peerDigestFetchFinish(fetch, buf, "too big stored header");
+ peerDigestFetchAbort(fetch, buf, "stored header too big");
else
storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf,
peerDigestSwapInHeaders, fetch);
peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
{
DigestFetchState *fetch = data;
+
if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
return;
+
if (size >= StoreDigestCBlockSize) {
- peer *peer = fetch->peer;
+ PeerDigest *pd = fetch->pd;
HttpReply *rep = fetch->entry->mem_obj->reply;
const int seen = fetch->offset + size;
- assert(peer && rep);
- if (peerDigestSetCBlock(peer, buf)) {
+ assert(pd && rep);
+ if (peerDigestSetCBlock(pd, buf)) {
+ /* XXX: soon we will have variable header size */
fetch->offset += StoreDigestCBlockSize;
- /* switch to CD buffer */
+ /* switch to CD buffer and fetch digest guts */
memFree(MEM_4K_BUF, buf);
buf = NULL;
- assert(peer->digest.cd->mask);
+ assert(pd->cd->mask);
storeClientCopy(fetch->entry,
seen,
fetch->offset,
- peer->digest.cd->mask_size,
- peer->digest.cd->mask,
+ pd->cd->mask_size,
+ pd->cd->mask,
peerDigestSwapInMask, fetch);
} else {
- peerDigestFetchFinish(fetch, buf, "invalid digest cblock");
+ peerDigestFetchAbort(fetch, buf, "invalid digest cblock");
}
} else {
/* need more data, do we have space? */
if (size >= SM_PAGE_SIZE)
- peerDigestFetchFinish(fetch, buf, "too big cblock");
+ peerDigestFetchAbort(fetch, buf, "digest cblock too big");
else
storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf,
peerDigestSwapInCBlock, fetch);
peerDigestSwapInMask(void *data, char *buf, ssize_t size)
{
DigestFetchState *fetch = data;
- peer *peer;
- HttpReply *rep;
- size_t buf_sz;
- /*
- * NOTE! buf points to the middle of peer->digest.cd->mask!
- */
+ PeerDigest *pd;
+
+ /* NOTE! buf points to the middle of pd->cd->mask! */
if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask"))
return;
- peer = fetch->peer;
- rep = fetch->entry->mem_obj->reply;
- assert(peer && rep);
- assert(peer->digest.cd && peer->digest.cd->mask);
+ pd = fetch->pd;
+ assert(pd->cd && pd->cd->mask);
fetch->offset += size;
fetch->mask_offset += size;
- if (fetch->mask_offset >= peer->digest.cd->mask_size) {
+ if (fetch->mask_offset >= pd->cd->mask_size) {
debug(72, 2) ("peerDigestSwapInMask: Done! Got %d, expected %d\n",
- fetch->mask_offset, peer->digest.cd->mask_size);
- assert(fetch->mask_offset == peer->digest.cd->mask_size);
- peerDigestFetchFinish(fetch, NULL, NULL);
- return;
+ fetch->mask_offset, pd->cd->mask_size);
+ assert(fetch->mask_offset == pd->cd->mask_size);
+ assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask"));
+ } else {
+ const size_t buf_sz = pd->cd->mask_size - fetch->mask_offset;
+ assert(buf_sz > 0);
+ storeClientCopy(fetch->entry,
+ fetch->offset,
+ fetch->offset,
+ buf_sz,
+ pd->cd->mask + fetch->mask_offset,
+ peerDigestSwapInMask, fetch);
}
- buf_sz = peer->digest.cd->mask_size - fetch->mask_offset;
- assert(buf_sz > 0);
- storeClientCopy(fetch->entry,
- fetch->offset,
- fetch->offset,
- buf_sz,
- peer->digest.cd->mask + fetch->mask_offset,
- peerDigestSwapInMask, fetch);
}
static int
peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name)
{
- const char *reason = NULL;
- const char *no_bug = NULL;
+ PeerDigest *pd = NULL;
+ const char *host = "<unknown>"; /* peer host */
+ const char *reason = NULL; /* reason for completion */
+ const char *no_bug = NULL; /* successful completion if set */
const int fcb_valid = cbdataValid(fetch);
- const int pcb_valid = fcb_valid && cbdataValid(fetch->peer);
-
- if (pcb_valid)
- debug(72, 6) ("%s: %s offset: %d size: %d.\n",
- step_name, fetch->peer->host, fetch->offset, size);
-
-
- /* test exiting conditions */
- if (!fcb_valid)
- reason = "fetch aborted?";
- else if (!pcb_valid)
- reason = "peer disappeared";
- else if (size < 0)
- reason = "swap failure";
- else if (!size)
- reason = no_bug = "eof";
- else if (!fetch->entry)
- reason = "swap abort(?)";
- else if (fetch->entry->store_status == STORE_ABORTED)
- reason = "swap abort";
-
- /* report exit reason */
+ const int pdcb_valid = fcb_valid && cbdataValid(fetch->pd);
+ const int pcb_valid = pdcb_valid && cbdataValid(fetch->pd->peer);
+
+ /* test possible exiting conditions (the same for most steps!)
+ * cases marked with '?!' should not happen */
+
+ if (!reason) {
+ if (!fcb_valid)
+ reason = "fetch aborted?!";
+ else if (!(pd = fetch->pd))
+ reason = "peer digest disappeared?!";
+ else if (!cbdataValid(pd))
+ reason = "invalidated peer digest?!";
+ else
+ host = strBuf(pd->host);
+ }
+
+ debug(72, 6) ("%s: peer %s, offset: %d size: %d.\n",
+ step_name, host, fcb_valid ? fetch->offset : -1, size);
+
+ /* continue checking (with pd and host known and valid) */
+ if (!reason) {
+ if (!cbdataValid(pd->peer))
+ reason = "peer disappeared";
+ else if (size < 0)
+ reason = "swap failure";
+ else if (!fetch->entry)
+ reason = "swap aborted?!";
+ else if (fetch->entry->store_status == STORE_ABORTED)
+ reason = "swap aborted";
+ }
+
+ /* continue checking (maybe-successful eof case) */
+ if (!reason && !size) {
+ if (!pd->cd)
+ reason = "null digest?!";
+ else if (fetch->mask_offset != pd->cd->mask_size)
+ reason = "premature end of digest?!";
+ else if (!peerDigestUseful(pd))
+ reason = "useless digest";
+ else
+ reason = no_bug = "success";
+ }
+
+ /* finish if we have a reason */
if (reason) {
- debug(72, pcb_valid ? 3 : 1) ("%s: exiting on %s\n", step_name, reason);
- if (pcb_valid) {
- peerDigestFetchFinish(fetch, buf, no_bug ? NULL : reason);
+ const int level = strstr(reason, "?!") ? 1 : 3;
+ debug(72, level) ("%s: peer %s, exiting after '%s'\n",
+ step_name, host, reason);
+ peerDigestReqFinish(fetch, buf,
+ fcb_valid, pdcb_valid, pcb_valid, reason, !no_bug);
+ } else {
+ /* paranoid check */
+ assert(fcb_valid && pdcb_valid && pcb_valid);
+ }
+ return reason != NULL;
+}
+
+/* call this when all callback data is valid but something bad happened */
+static void
+peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason)
+{
+ peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);
+}
+
+/* complete the digest transfer, update stats, unlock/release everything */
+static void
+peerDigestReqFinish(DigestFetchState * fetch, char *buf,
+ int fcb_valid, int pdcb_valid, int pcb_valid,
+ const char *reason, int err)
+{
+ assert(reason);
+
+ /* must go before peerDigestPDFinish */
+ if (pdcb_valid) {
+ fetch->pd->flags.requested = 0;
+ fetch->pd->req_result = reason;
+ }
+
+ /* schedule next check if peer is still out there */
+ if (pcb_valid) {
+ PeerDigest *pd = fetch->pd;
+ if (err) {
+ pd->times.retry_delay = peerDigestIncDelay(pd);
+ peerDigestSetCheck(pd, pd->times.retry_delay);
} else {
- /* XXX: this is probably wrong, but
- * !fcb_valid implies !pcb_valid so we cannot access peer cbdata */
- if (fcb_valid) {
- cbdataUnlock(fetch);
- cbdataFree(fetch);
- } else {
- cbdataUnlock(fetch);
- }
+ pd->times.retry_delay = 0;
+ peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
}
}
- return reason != NULL;
+
+ /* note: order is significant */
+ if (fcb_valid)
+ peerDigestFetchSetStats(fetch);
+ if (pdcb_valid)
+ peerDigestPDFinish(fetch, pcb_valid, err);
+ if (fcb_valid)
+ peerDigestFetchFinish(fetch, err);
+ if (buf)
+ memFree(MEM_4K_BUF, buf);
}
-/* free state structures, disables digest on error
- * must be called only when peer and fetch cbdata's are valid */
+
+/* destroys digest if peer disappeared
+ * must be called only when fetch and pd cbdata are valid */
static void
-peerDigestFetchFinish(DigestFetchState * fetch, char *buf, const char *err_msg)
+peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err)
{
- peer *peer = fetch->peer;
- MemObject *mem = fetch->entry->mem_obj;
- const time_t expires = fetch->entry->expires;
- const time_t fetch_resp_time = squid_curtime - fetch->start_time;
- const int b_read = (fetch->entry->store_status == STORE_PENDING) ?
- mem->inmem_hi : mem->object_sz;
- const int req_len = fetch->request ? httpRequestPrefixLen(fetch->request) : 0;
- assert(fetch->request);
- /* final checks */
- if (!err_msg) {
- if (!peer->digest.cd)
- err_msg = "null digest (internal bug?)";
- else if (fetch->mask_offset != peer->digest.cd->mask_size)
- err_msg = "premature eof";
- else if (!peerDigestUseful(peer))
- err_msg = "useless digest";
+ PeerDigest *pd = fetch->pd;
+ const char *host = strBuf(pd->host);
+
+ pd->times.received = squid_curtime;
+ pd->times.req_delay = fetch->resp_time;
+ kb_incr(&pd->stats.sent.kbytes, (size_t)fetch->sent.bytes);
+ kb_incr(&pd->stats.recv.kbytes, (size_t)fetch->recv.bytes);
+ pd->stats.sent.msgs += fetch->sent.msg;
+ pd->stats.recv.msgs += fetch->recv.msg;
+
+ if (err) {
+ debug(72, 1) ("%sdisabling (%s) digest from %s\n",
+ pcb_valid ? "temporary " : "",
+ pd->req_result, host);
+
+ if (pd->cd) {
+ cacheDigestDestroy(pd->cd);
+ pd->cd = NULL;
+ }
+
+ pd->flags.usable = 0;
+
+ if (!pcb_valid)
+ peerDigestNotePeerGone(pd);
+ } else {
+ assert(pcb_valid);
+
+ pd->flags.usable = 1;
+
+ /* XXX: ugly condition, but how? */
+ if (fetch->entry->store_status == STORE_OK)
+ debug(72, 2) ("re-used old digest from %s\n", host);
+ else
+ debug(72, 2) ("received valid digest from %s\n", host);
}
+}
+
+/* free fetch state structures
+ * must be called only when fetch cbdata is valid */
+static void
+peerDigestFetchFinish(DigestFetchState * fetch, int err)
+{
+ assert(fetch->entry && fetch->request);
+
if (fetch->old_entry) {
debug(72, 2) ("peerDigestFetchFinish: deleting old entry\n");
storeUnregister(fetch->old_entry, fetch);
storeUnlockObject(fetch->old_entry);
fetch->old_entry = NULL;
}
- assert(fetch->entry);
- debug(72, 3) ("peerDigestFetchFinish: %s, read %d b, expires: %s lmt: %s\n",
- peer->host, b_read,
- mkrfc1123(fetch->entry->expires), mkrfc1123(fetch->entry->lastmod));
- if (err_msg) {
- debug(72, 1) ("disabling corrupted (%s) digest from %s\n",
- err_msg, peer->host);
- if (peer->digest.cd) {
- cacheDigestDestroy(peer->digest.cd);
- peer->digest.cd = NULL;
- }
- /* disable for a while */
- peer->digest.flags.usable = 0;
- peerDigestDelay(peer, 1,
- max_delay(
- peerDigestExpiresDelay(peer, fetch->entry),
- peerDigestNextDisDelay(peer)));
- /* release buggy entry */
- storeReleaseRequest(fetch->entry);
- } else {
- /* ugly condition, but how? */
- if (fetch->entry->store_status == STORE_OK) {
- debug(72, 2) ("re-used old digest from %s\n", peer->host);
- } else {
- debug(72, 2) ("received valid digest from %s\n", peer->host);
- }
- peer->digest.flags.usable = 1;
- peer->digest.flags.disabled = 0;
- peer->digest.last_dis_delay = 0;
- peerDigestDelay(peer, 0,
- max_delay(peerDigestExpiresDelay(peer, fetch->entry), 0));
- }
+
/* update global stats */
- /* note: outgoing numbers are not precise! @?@ */
- kb_incr(&Counter.cd.kbytes_sent, req_len);
- kb_incr(&Counter.cd.kbytes_recv, (size_t) b_read);
- Counter.cd.msgs_sent++;
- Counter.cd.msgs_recv++;
- /* update peer stats */
- kb_incr(&peer->digest.stats.kbytes_sent, req_len);
- kb_incr(&peer->digest.stats.kbytes_recv, (size_t) b_read);
- peer->digest.stats.msgs_sent++;
- peer->digest.stats.msgs_recv++;
+ kb_incr(&Counter.cd.kbytes_sent, (size_t) fetch->sent.bytes);
+ kb_incr(&Counter.cd.kbytes_recv, (size_t) fetch->recv.bytes);
+ Counter.cd.msgs_sent += fetch->sent.msg;
+ Counter.cd.msgs_recv += fetch->recv.msg;
+
/* unlock everything */
+ if (fetch->pd)
+ cbdataUnlock(fetch->pd);
storeUnregister(fetch->entry, fetch);
storeUnlockObject(fetch->entry);
requestUnlink(fetch->request);
fetch->request = NULL;
cbdataUnlock(fetch);
cbdataFree(fetch);
- fetch = NULL;
- if (buf)
- memFree(MEM_4K_BUF, buf);
- buf = NULL;
- /* set it here and in peerDigestRequest to protect against long downloads */
- peer->digest.last_req_timestamp = squid_curtime;
- peer->digest.last_fetch_resp_time = fetch_resp_time;
- peer->digest.flags.requested = 0;
- debug(72, 2) ("peerDigestFetchFinish: %s done; took: %d secs; expires: %s\n",
- peer->host, fetch_resp_time, mkrfc1123(expires));
}
+/* calculate fetch stats after completion */
+static void
+peerDigestFetchSetStats(DigestFetchState * fetch)
+{
+ MemObject *mem;
+ assert(fetch->entry && fetch->request);
+
+ mem = fetch->entry->mem_obj;
+ assert(mem);
+
+ /* XXX: outgoing numbers are not precise */
+ /* XXX: we must distinguish between 304 hits and misses here */
+ fetch->sent.bytes = httpRequestPrefixLen(fetch->request);
+ fetch->recv.bytes = fetch->entry->store_status == STORE_PENDING ?
+ mem->inmem_hi : mem->object_sz;
+ fetch->sent.msg = fetch->recv.msg = 1;
+ fetch->expires = fetch->entry->expires;
+ fetch->resp_time = squid_curtime - fetch->start_time;
+
+ debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secs\n",
+ fetch->recv.bytes, fetch->resp_time);
+ debug(72, 3) ("peerDigestFetchFinish: expires: %d (%+d), lmt: %d (%+d)\n",
+ fetch->expires, fetch->expires-squid_curtime,
+ fetch->entry->lastmod, fetch->entry->lastmod-squid_curtime);
+}
+
+
static int
-peerDigestSetCBlock(peer * peer, const char *buf)
+peerDigestSetCBlock(PeerDigest *pd, const char *buf)
{
StoreDigestCBlock cblock;
int freed_size = 0;
+ const char *host = strBuf(pd->host);
+
xmemcpy(&cblock, buf, sizeof(cblock));
/* network -> host conversions */
cblock.ver.current = ntohs(cblock.ver.current);
cblock.del_count = ntohl(cblock.del_count);
cblock.mask_size = ntohl(cblock.mask_size);
debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)\n",
- peer->host, (int) cblock.ver.current, (int) cblock.ver.required);
+ host, (int) cblock.ver.current, (int) cblock.ver.required);
debug(72, 2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n",
cblock.mask_size, cblock.count,
xpercentInt(cblock.count, cblock.capacity));
/* check version requirements (both ways) */
if (cblock.ver.required > CacheDigestVer.current) {
debug(72, 1) ("%s digest requires version %d; have: %d\n",
- peer->host, cblock.ver.required, CacheDigestVer.current);
+ host, cblock.ver.required, CacheDigestVer.current);
return 0;
}
if (cblock.ver.current < CacheDigestVer.required) {
debug(72, 1) ("%s digest is version %d; we require: %d\n",
- peer->host, cblock.ver.current, CacheDigestVer.required);
+ host, cblock.ver.current, CacheDigestVer.required);
return 0;
}
/* check consistency */
if (cblock.ver.required > cblock.ver.current ||
cblock.mask_size <= 0 || cblock.capacity <= 0 ||
cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
- debug(72, 0) ("%s digest cblock is corrupted.\n", peer->host);
+ debug(72, 0) ("%s digest cblock is corrupted.\n", host);
return 0;
}
/* check consistency further */
if (cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
debug(72, 0) ("%s digest cblock is corrupted (mask size mismatch: %d ? %d).\n",
- peer->host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry));
+ host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry));
return 0;
}
/* there are some things we cannot do yet */
if (cblock.hash_func_count != CacheDigestHashFuncCount) {
debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.\n",
- peer->host, cblock.hash_func_count, CacheDigestHashFuncCount);
+ host, cblock.hash_func_count, CacheDigestHashFuncCount);
return 0;
}
/*
* no cblock bugs below this point
*/
/* check size changes */
- if (peer->digest.cd && cblock.mask_size != peer->digest.cd->mask_size) {
+ if (pd->cd && cblock.mask_size != pd->cd->mask_size) {
debug(72, 2) ("%s digest changed size: %d -> %d\n",
- peer->host, cblock.mask_size, peer->digest.cd->mask_size);
- freed_size = peer->digest.cd->mask_size;
- cacheDigestDestroy(peer->digest.cd);
- peer->digest.cd = NULL;
+ host, cblock.mask_size, pd->cd->mask_size);
+ freed_size = pd->cd->mask_size;
+ cacheDigestDestroy(pd->cd);
+ pd->cd = NULL;
}
- if (!peer->digest.cd) {
+ if (!pd->cd) {
debug(72, 2) ("creating %s digest; size: %d (%+d) bytes\n",
- peer->host, cblock.mask_size, (int) (cblock.mask_size - freed_size));
- peer->digest.cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);
+ host, cblock.mask_size, (int) (cblock.mask_size - freed_size));
+ pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);
if (cblock.mask_size >= freed_size)
kb_incr(&Counter.cd.memory, cblock.mask_size - freed_size);
}
- assert(peer->digest.cd);
+ assert(pd->cd);
/* these assignments leave us in an inconsistent state until we finish reading the digest */
- peer->digest.cd->count = cblock.count;
- peer->digest.cd->del_count = cblock.del_count;
+ pd->cd->count = cblock.count;
+ pd->cd->del_count = cblock.del_count;
return 1;
}
static int
-peerDigestUseful(const peer * peer)
+peerDigestUseful(const PeerDigest *pd)
{
/* TODO: we should calculate the prob of a false hit instead of bit util */
- const int bit_util = cacheDigestBitUtil(peer->digest.cd);
- if (bit_util > 75) {
+ const int bit_util = cacheDigestBitUtil(pd->cd);
+ if (bit_util > 65) {
debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).\n",
- peer->host, bit_util);
+ strBuf(pd->host), bit_util);
return 0;
}
return 1;
}
+static int
+saneDiff(time_t diff)
+{
+ return abs(diff) > squid_curtime/2 ? 0 : diff;
+}
+
+void
+peerDigestStatsReport(const PeerDigest *pd, StoreEntry * e)
+{
+#define f2s(flag) (pd->flags.flag ? "yes" : "no")
+#define appendTime(tm) storeAppendPrintf(e, "%s\t %10d\t %+d\t %+d\n", \
+ ""#tm, pd->times.tm, \
+ saneDiff(pd->times.tm - squid_curtime), \
+ saneDiff(pd->times.tm - pd->times.initialized))
+
+ const char *host = pd ? strBuf(pd->host) : NULL;
+ assert(pd);
+
+ storeAppendPrintf(e, "\npeer digest from %s\n", host);
+
+ cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
+
+ storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");
+ appendTime(initialized);
+ appendTime(needed);
+ appendTime(requested);
+ appendTime(received);
+ appendTime(next_check);
+
+ storeAppendPrintf(e, "peer digest state:\n");
+ storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",
+ f2s(needed), f2s(usable), f2s(requested));
+ storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",
+ pd->times.retry_delay);
+ storeAppendPrintf(e, "\tlast request response time: %d secs\n",
+ pd->times.req_delay);
+ storeAppendPrintf(e, "\tlast request result: %s\n",
+ pd->req_result ? pd->req_result : "(none)");
+
+ storeAppendPrintf(e, "\npeer digest traffic:\n");
+ storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",
+ pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
+ storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n",
+ pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
+
+ storeAppendPrintf(e, "\npeer digest structure:\n");
+ if (pd->cd)
+ cacheDigestReport(pd->cd, host, e);
+ else
+ storeAppendPrintf(e, "\tno in-memory copy\n");
+}
+
#endif