/*
- * $Id: peer_digest.cc,v 1.4 1998/04/09 00:08:48 rousskov Exp $
+ * $Id: peer_digest.cc,v 1.5 1998/04/09 11:41:27 rousskov Exp $
*
* DEBUG: section 72 Peer Digest Routines
* AUTHOR: Alex Rousskov
/* local types */
/* local prototypes */
+static void peerDigestClean(peer *p);
+static time_t peerDigestNextDisDelay(const peer *p);
+static void peerDigestDisable(peer *p);
+static void peerDigestDelay(peer *p, int disable, time_t delay);
static void peerDigestValidate(peer *p);
static void peerDigestRequest(peer *p);
static void peerDigestFetchReply(void *data, char *buf, ssize_t size);
static void peerDigestFetchFinish(DigestFetchState *fetch, char *buf, const char *err_msg);
static int peerDigestSetCBlock(peer *p, const char *buf);
static int peerDigestUpdateMask(peer *peer, int offset, const char *buf, int size);
+#define max_delay(t1,t2) ((t1) >= (t2) ? (t1) : (t2))
+
/* local constants */
#define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
/* min interval for requesting digests from the same peer */
-static const time_t PeerDigestRequestMinGap = 15 * 60; /* seconds */
+static const time_t PeerDigestRequestMinGap = 3 * 60; /* seconds */
-/* min interval for requesting digests. period */
+/* min interval for requesting digests at start */
static const time_t GlobalDigestRequestMinGap = 1 * 60; /* seconds */
/* local vars */
assert(!p->digest.cd);
assert(SM_PAGE_SIZE == 4096); /* we use MEM_4K_BUF */
if (EBIT_TEST(p->options, NEIGHBOR_NO_DIGEST)) {
- EBIT_SET(p->digest.flags, PD_DISABLED);
+ peerDigestDisable(p);
} else {
cbdataLock(p);
peerDigestValidate(p);
{
if (!cbdataValid(p))
debug(72, 2) ("peerDigest: note: peer %s was reset or deleted\n", p->host);
- debug(72, 2) ("peerDigestClean: disabling peer %s digests for good\n", p->host);
assert(!EBIT_TEST(p->digest.flags, PD_REQUESTED));
- EBIT_SET(p->digest.flags, PD_DISABLED);
+ peerDigestDisable(p);
cbdataUnlock(p);
}
+/* disables peer for good */
+static void
+peerDigestDisable(peer *p)
+{
+ peerDigestDelay(p, 1, -1);
+}
+
+static time_t
+peerDigestNextDisDelay(const peer *p)
+{
+ assert(p);
+ return p->digest.last_dis_delay ?
+ 2 * p->digest.last_dis_delay : /* exponential backoff */
+ PeerDigestRequestMinGap; /* minimal delay */
+}
+
+/* delays/disables digest for a psecified delay (disables forever if negative delay) */
+static void
+peerDigestDelay(peer *p, int disable, time_t delay)
+{
+ assert(p);
+ if (disable) {
+ EBIT_SET(p->digest.flags, PD_DISABLED);
+ p->digest.last_dis_delay = delay;
+ }
+ if (delay >= 0) {
+ assert(delay || !disable);
+ debug(72, 2) ("peerDigestDelay: %s: peer %s for %d secs till %s\n",
+ disable ? "disabling" : "delaying",
+ p->host, delay, mkrfc1123(squid_curtime + delay));
+ eventAdd("peerDigestValidate", (EVH*) peerDigestValidate,
+ p, delay);
+ } else {
+ assert(disable);
+ debug(72, 2) ("peerDigestDisable: disabling peer %s for good\n",
+ p->host);
+ /* just in case, will not need it anymore */
+ EBIT_CLR(p->digest.flags, PD_USABLE);
+ }
+}
+
/* request new digest if our copy is too old; schedule next validation */
static void
peerDigestValidate(peer *p)
peerDigestClean(p);
return;
}
- debug(72, 3) ("curent time: %s\n", mkrfc1123(squid_curtime));
- if (EBIT_TEST(p->digest.flags, PD_DISABLED))
- return;
+ debug(72, 3) ("current GMT time: %s\n", mkrfc1123(squid_curtime));
assert(!EBIT_TEST(p->digest.flags, PD_REQUESTED));
- debug(72, 3) ("peerDigestValidate: %s enabled\n", p->host);
- if (p->digest.cd) {
+ debug(72, 3) ("peerDigestValidate: %s was %s disabled\n",
+ p->host, p->digest.last_dis_delay ? "" : "not");
+ if (1 /* p->digest.cd */) {
const cache_key *key;
key = storeKeyPublic(urlRInternal(p->host, p->http_port, NULL, StoreDigestUrlPath), METHOD_GET);
e = storeGet(key);
}
/* currently we rely on entry->expire information */
do_request = !e || e->expires <= squid_curtime;
- req_time = e ? e->expires : squid_curtime;
+ /* artificially increase expires to avoid race conditions */
+ req_time = e ? e->expires+PeerDigestRequestMinGap : squid_curtime;
if (req_time < squid_curtime)
req_time = squid_curtime;
/* do not request too often from one peer */
}
req_time = p->digest.last_req_timestamp + PeerDigestRequestMinGap;
}
- /* do not request too often from all peers */
- if (req_time - global_last_req_timestamp < GlobalDigestRequestMinGap) {
+ /* at start, do not request too often from all peers */
+ if (!EBIT_TEST(p->digest.flags, PD_INITED) &&
+ req_time - global_last_req_timestamp < GlobalDigestRequestMinGap) {
if (do_request) {
debug(72, 2) ("peerDigestValidate: %s, avoiding too close requests (%d secs).\n",
p->host, req_time - global_last_req_timestamp);
do_request = 0;
}
req_time = global_last_req_timestamp + GlobalDigestRequestMinGap;
+ /* otherwise we have all but one peer returning at the same moment @?@ */
+ debug(72, 5) ("peerDigestValidate: inc req_time (%+d) in anticipation of more reqs\n",
+ (int)(req_time - global_last_req_timestamp));
+ global_last_req_timestamp = req_time;
}
/* start request if needed */
if (do_request) {
static nest_level = 0;
nest_level++;
assert(nest_level == 1);
- debug(72, 2) ("peerDigestValidate: %s requesting; old e expires: %s\n",
- p->host, e ? mkrfc1123(e->expires) : "no entry", mkrfc1123(squid_curtime));
- /* will disable digests or call peerDigestValidate() */
+ debug(72, 2) ("peerDigestValidate: %s requesting; old entry expires: %s\n",
+ p->host, e ? mkrfc1123(e->expires) : "no entry");
+ /* will eventually disable digests or call peerDigest Delay */
peerDigestRequest(p);
nest_level--;
- return;
+ } else {
+ /* schedule next re-validation */
+ assert(req_time > squid_curtime);
+ peerDigestDelay(p, !p->digest.cd, req_time - squid_curtime);
}
- /* schedule next re-validation */
- eventAdd("peerDigestValidate", (EVH*) peerDigestValidate,
- p, req_time - squid_curtime);
- debug(72, 2) ("peerDigestValidate: %s scheduled for re-validation at %s\n",
- p->host, mkrfc1123(req_time));
}
/* ask peer cache for a fresh digest */
url = urlRInternal(p->host, p->http_port, "", StoreDigestUrlPath);
key = storeKeyPublic(url, METHOD_GET);
debug(72,2) ("peerDigestRequest: %s key: %s\n", url, storeKeyText(key));
- req = urlParse(METHOD_GET, url);
+ req = requestLink(urlParse(METHOD_GET, url));
assert(req);
/* add custom headers */
/* rewrite this when requests get new header interface */
storeClientListAdd(old_e, fetch);
}
e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
- debug(72,4) ("peerDigestRequest: new entry is private: %d\n",
+ debug(72,5) ("peerDigestRequest: new entry is private: %d\n",
(int)EBIT_TEST(e->flag, KEY_PRIVATE));
storeClientListAdd(e, fetch);
/* set lastmod to trigger IMS request if possible */
if (old_e)
e->lastmod = old_e->lastmod;
fetch->offset = 0;
+ debug(72,3) ("peerDigestRequest: forwarding to protoDispatch...\n");
/* push towards peer cache */
protoDispatch(0, e, req);
storeClientCopy(e, 0, 0, SM_PAGE_SIZE, memAllocate(MEM_4K_BUF),
}
} else {
/* some kind of a bug */
- peerDigestFetchFinish(fetch, buf, "wrong status code from peer");
+ peerDigestFetchFinish(fetch, buf, httpStatusLineReason(&reply->sline));
return;
}
/* must have a ready-to-use store entry if we got here */
HttpReply *rep = fetch->entry->mem_obj->reply;
const int seen = fetch->offset + size;
assert(peer && buf && rep);
- if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
+ if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInMask"))
return;
if (peerDigestUpdateMask(peer, fetch->mask_offset, buf, size)) {
fetch->offset += size;
const char *reason = NULL;
const char *no_bug = NULL;
- debug(72, 3) ("%s: %s offset: %d size: %d.\n",
+ debug(72, 6) ("%s: %s offset: %d size: %d.\n",
step_name, fetch->peer->host, fetch->offset, size);
/* test exiting conditions */
else if (!size) reason = no_bug = "eof";
else if (!fetch->entry) reason = "swap abort(?)";
else if (fetch->entry->store_status == STORE_ABORTED) reason = "swap abort";
- else if (!cbdataValid(fetch->peer)) reason = "peer disappeard";
+ else if (!cbdataValid(fetch->peer)) reason = "peer disappeared";
/* report exit reason */
if (reason) {
const time_t expires = fetch->entry->expires;
const time_t fetch_resp_time = squid_curtime - fetch->start_time;
const off_t b_read = (fetch->entry->swap_status == STORE_PENDING) ? mem->inmem_hi : mem->object_sz;
- /* set it here and in peerDigestRequest to protect against long downloads */
- peer->digest.last_req_timestamp = squid_curtime;
- peer->digest.last_fetch_resp_time = fetch_resp_time;
if (!err_msg && !peer->digest.cd)
err_msg = "null digest (internal bug?)";
if (!err_msg && fetch->mask_offset != peer->digest.cd->mask_size)
storeUnlockObject(fetch->old_entry);
fetch->old_entry = NULL;
}
- assert(NULL != fetch->entry);
- if (req) {
- requestUnlink(req);
- }
+ assert(fetch->entry);
if (err_msg) {
debug(72, 1) ("disabling corrupted (%s) digest from %s\n",
err_msg, peer->host);
cacheDigestDestroy(peer->digest.cd);
peer->digest.cd = NULL;
}
- EBIT_SET(peer->digest.flags, PD_DISABLED);
+ /* disable for a while */
EBIT_CLR(peer->digest.flags, PD_USABLE);
+ peerDigestDelay(peer, 1,
+ max_delay(expires - squid_curtime,
+ peerDigestNextDisDelay(peer)));
/* release buggy entry */
storeReleaseRequest(fetch->entry);
} else {
storeComplete(fetch->entry);
EBIT_SET(peer->digest.flags, PD_USABLE);
+ EBIT_CLR(peer->digest.flags, PD_DISABLED);
+ peer->digest.last_dis_delay = 0;
+ peerDigestDelay(peer, 0, max_delay(expires - squid_curtime, 0));
}
storeUnregister(fetch->entry, fetch);
storeUnlockObject(fetch->entry);
+ requestUnlink(req);
fetch->entry = NULL;
cbdataFree(fetch);
fetch = NULL;
memFree(MEM_4K_BUF, buf);
+ /* set it here and in peerDigestRequest to protect against long downloads */
+ peer->digest.last_req_timestamp = squid_curtime;
+ peer->digest.last_fetch_resp_time = fetch_resp_time;
EBIT_CLR(peer->digest.flags, PD_REQUESTED);
kb_incr(&Counter.cd.kbytes_recv, (size_t)b_read);
- Counter.cd.times_used++;
- debug(72, 2) ("peerDigestFetchFinish: %s took: %d secs; expires: %s\n",
+ debug(72, 2) ("peerDigestFetchFinish: %s done; took: %d secs; expires: %s\n",
peer->host, fetch_resp_time, mkrfc1123(expires));
- /* schedule next check */
- peerDigestValidate(peer);
- /* paranoid loop detection */
- assert(!EBIT_TEST(peer->digest.flags, PD_REQUESTED));
- debug(72, 3) ("peerDigestFetchFinish: %s done\n", peer->host);
}
static int