From: rousskov <> Date: Thu, 9 Apr 1998 04:48:08 +0000 (+0000) Subject: - Initial revision X-Git-Tag: SQUID_3_0_PRE1~3555 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9b7de833b7dd75655b8be7c126d38a49ac2c5f69;p=thirdparty%2Fsquid.git - Initial revision --- diff --git a/src/peer_digest.cc b/src/peer_digest.cc new file mode 100644 index 0000000000..29e062609e --- /dev/null +++ b/src/peer_digest.cc @@ -0,0 +1,509 @@ + +/* + * $Id: peer_digest.cc,v 1.1 1998/04/08 22:48:08 rousskov Exp $ + * + * DEBUG: section 72 Peer Digest Routines + * AUTHOR: Alex Rousskov + * + * SQUID Internet Object Cache http://squid.nlanr.net/Squid/ + * -------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from the + * Internet community. Development is led by Duane Wessels of the + * National Laboratory for Applied Network Research and funded by + * the National Science Foundation. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include "squid.h" + +/* local types */ + +/* local prototypes */ +static void peerDigestValidate(peer *p); +static void peerDigestRequest(peer *p); +static void peerDigestFetchReply(void *data, char *buf, ssize_t size); +static void peerDigestRequest(peer *p); +static void peerDigestSwapInHeaders(void *data, char *buf, ssize_t size); +static void peerDigestSwapInCBlock(void *data, char *buf, ssize_t size); +static void peerDigestSwapInMask(void *data, char *buf, ssize_t size); +static int peerDigestFetchedEnough(DigestFetchState *fetch, char *buf, ssize_t size, const char *step_name); +static void peerDigestFetchFinish(DigestFetchState *fetch, char *buf, const char *err_msg); +static int peerDigestSetCBlock(peer *p, const char *buf); +static int peerDigestUpdateMask(peer *peer, int offset, const char *buf, int size); + +/* local constants */ +#define StoreDigestCBlockSize sizeof(StoreDigestCBlock) + +/* min interval for requesting digests from the same peer */ +static const time_t PeerDigestRequestMinGap = 1 * 60; /* seconds */ + +void +peerDigestInit(peer *p) +{ + assert(p); + assert(!p->digest.flags); + assert(!p->digest.cd); + assert(SM_PAGE_SIZE == 4096); /* we use MEM_4K_BUF */ + if (EBIT_TEST(p->options, NEIGHBOR_NO_DIGEST)) { + EBIT_SET(p->digest.flags, PD_DISABLED); + } else { + cbdataLock(p); + peerDigestValidate(p); + } + EBIT_SET(p->digest.flags, PD_INITED); +} + +/* no pending events or requests should exist when you call this */ +static void +peerDigestClean(peer *p) +{ + if (!cbdataValid(p)) + debug(72, 2) ("peerDigest: note: peer %s was reset or deleted\n", p->host); + debug(72, 2) ("peerDigestClean: disabling peer %s digests for good\n", p->host); + assert(!EBIT_TEST(p->digest.flags, PD_REQUESTED)); + EBIT_SET(p->digest.flags, PD_DISABLED); + cbdataUnlock(p); +} + +/* request new digest if our copy is too old; schedule next validation */ +static void +peerDigestValidate(peer *p) +{ + StoreEntry *e = NULL; + int do_request; + time_t req_time = squid_curtime; + assert(p); + debug(72, 3) ("peerDigestValidate: digest %s\n", p->host); + if (!cbdataValid(p)) { + peerDigestClean(p); + return; + } + debug(72, 3) ("curent time: %s\n", mkrfc1123(squid_curtime)); + if (EBIT_TEST(p->digest.flags, PD_DISABLED)) + return; + assert(!EBIT_TEST(p->digest.flags, PD_REQUESTED)); + debug(72, 3) ("peerDigestValidate: %s enabled\n", p->host); + if (p->digest.cd) { + const cache_key *key; + key = storeKeyPublic(urlRInternal(p->host, p->http_port, NULL, StoreDigestUrlPath), METHOD_GET); + e = storeGet(key); + debug(72, 3) ("peerDigestValidate: %s store entry, key: %s, exp: %s\n", + e ? "has" : "no", storeKeyText(key), mkrfc1123(e ? e->expires : 0)); + } + /* currently we rely on entry->expire information */ + do_request = !e || e->expires <= squid_curtime; + req_time = e ? e->expires : squid_curtime; + if (req_time < squid_curtime) + req_time = squid_curtime; + /* do not request too often */ + if (req_time - p->digest.last_req_timestamp < PeerDigestRequestMinGap) { + if (do_request) { + debug(72, 2) ("peerDigestValidate: %s, avoiding too close requests (%d secs).\n", + p->host, req_time - p->digest.last_req_timestamp); + do_request = 0; + } + req_time = p->digest.last_req_timestamp + PeerDigestRequestMinGap; + } + /* start request if needed */ + if (do_request) { + static nest_level = 0; + nest_level++; + assert(nest_level == 1); + debug(72, 2) ("peerDigestValidate: %s requesting; old e expires: %s\n", + p->host, e ? mkrfc1123(e->expires) : "no entry", mkrfc1123(squid_curtime)); + /* will disable digests or call peerDigestValidate() */ + peerDigestRequest(p); + nest_level--; + return; + } + /* schedule next re-validation */ + eventAdd("peerDigestValidate", (EVH*) peerDigestValidate, + p, req_time - squid_curtime); + debug(72, 2) ("peerDigestValidate: %s scheduled for re-validation at %s\n", + p->host, mkrfc1123(req_time)); +} + +/* ask peer cache for a fresh digest */ +static void +peerDigestRequest(peer *p) +{ + StoreEntry *e, *old_e; + char *url; + const cache_key *key; + request_t *req; + DigestFetchState *fetch = NULL; + assert(p); + EBIT_SET(p->digest.flags, PD_REQUESTED); + /* compute future request components */ + url = urlRInternal(p->host, p->http_port, "", StoreDigestUrlPath); + key = storeKeyPublic(url, METHOD_GET); + debug(72,2) ("peerDigestRequest: %s key: %s\n", url, storeKeyText(key)); + req = urlParse(METHOD_GET, url); + assert(req); + /* add custom headers */ + /* rewrite this when requests get new header interface */ + assert(!req->headers); + { + MemBuf mb; + memBufDefInit(&mb); + memBufPrintf(&mb, "Accept: %s,text/html\r\n", StoreDigestMimeStr); + memBufPrintf(&mb, "Cache-control: only-if-cached\r\n"); + memBufAppend(&mb, "\r\n", 2); + /* kludge! */ + assert(memBufFreeFunc(&mb) == &xfree); + req->headers = mb.buf; + req->headers_sz = mb.size; + } + /* create fetch state structure */ + fetch = memAllocate(MEM_DIGEST_FETCH_STATE); + cbdataAdd(fetch, MEM_DIGEST_FETCH_STATE); + fetch->peer = p; + fetch->start_time = squid_curtime; + p->digest.last_req_timestamp = squid_curtime; + EBIT_SET(req->flags, REQ_CACHABLE); + assert(EBIT_TEST(req->flags, REQ_CACHABLE)); /* @?@ @?@ */ + /* the rest is based on clientProcessExpired() */ + EBIT_SET(req->flags, REQ_REFRESH); + old_e = fetch->old_entry = storeGet(key); + if (old_e) { + debug(72,5) ("peerDigestRequest: found old entry\n"); + storeLockObject(old_e); + storeCreateMemObject(old_e, url, url); + storeClientListAdd(old_e, fetch); + } + e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method); + debug(72,4) ("peerDigestRequest: new entry is private: %d\n", + (int)EBIT_TEST(e->flag, KEY_PRIVATE)); + storeClientListAdd(e, fetch); + /* set lastmod to trigger IMS request if possible */ + if (old_e) + e->lastmod = old_e->lastmod; + fetch->offset = 0; + /* push towards peer cache */ + protoDispatch(0, e, req); + storeClientCopy(e, 0, 0, SM_PAGE_SIZE, memAllocate(MEM_4K_BUF), + peerDigestFetchReply, fetch); +} + +/* waits for full http headers to be received and parses them */ +static void +peerDigestFetchReply(void *data, char *buf, ssize_t size) +{ + DigestFetchState *fetch = data; + peer *peer = fetch->peer; + assert(peer && buf); + assert(!fetch->offset); + if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply")) + return; + if (headersEnd(buf, size)) { + http_status status; + HttpReply *reply = fetch->entry->mem_obj->reply; + assert(reply); + httpReplyParse(reply, buf); + status = reply->sline.status; + debug(72, 3) ("peerDigestFetchHeaders: status: %d, expires: %s\n", + status, mkrfc1123(reply->expires)); + /* this "if" is based on clientHandleIMSReply() */ + if (status == HTTP_NOT_MODIFIED) { + request_t *r = NULL; + /* our old entry is fine */ + assert(fetch->old_entry); + if (!fetch->old_entry->mem_obj->request) + fetch->old_entry->mem_obj->request = r = + requestLink(fetch->old_entry->mem_obj->request); + httpReplyUpdateOnNotModified(fetch->old_entry->mem_obj->reply, reply); + storeTimestampsSet(fetch->old_entry); + /* get rid of 304 reply */ + storeUnregister(fetch->entry, fetch); + /* paranoid assert: storeUnregister should not call us recursively */ + assert(fetch->entry); + storeUnlockObject(fetch->entry); + fetch->entry = fetch->old_entry; + fetch->old_entry = NULL; + requestUnlink(r); + fetch->entry->mem_obj->request = NULL; + } else + if (status == HTTP_OK) { + /* get rid of old entry if any */ + if (fetch->old_entry) { + debug(72, 3) ("peerDigestFetchReply: got new digest, requesting release of old digest\n"); + storeUnregister(fetch->old_entry, fetch); + storeReleaseRequest(fetch->old_entry); + storeUnlockObject(fetch->old_entry); + fetch->old_entry = NULL; + } + } else { + /* some kind of a bug */ + peerDigestFetchFinish(fetch, buf, "wrong status code from peer"); + return; + } + /* must have a ready-to-use store entry if we got here */ + /* can we stay with the old digest? */ + if (status == HTTP_NOT_MODIFIED && fetch->peer->digest.cd) + peerDigestFetchFinish(fetch, buf, NULL); + else + storeClientCopy(fetch->entry, /* have to swap in */ + 0, 0, SM_PAGE_SIZE, buf, peerDigestSwapInHeaders, fetch); + return; + } else { + /* need more data, do we have space? */ + if (size >= SM_PAGE_SIZE) + peerDigestFetchFinish(fetch, buf, "too big header"); + else + storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf, + peerDigestFetchReply, fetch); + } +} + +/* fetch headers from disk, pass on to SwapInCBlock */ +static void +peerDigestSwapInHeaders(void *data, char *buf, ssize_t size) +{ + DigestFetchState *fetch = data; + peer *peer = fetch->peer; + size_t hdr_size; + assert(peer && buf); + assert(!fetch->offset); + if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders")) + return; + if ((hdr_size = headersEnd(buf, size))) { + assert(fetch->entry->mem_obj->reply); + if (!fetch->entry->mem_obj->reply->sline.status) + httpReplyParse(fetch->entry->mem_obj->reply, buf); + assert(fetch->entry->mem_obj->reply->sline.status == HTTP_OK); + fetch->offset += hdr_size; + storeClientCopy(fetch->entry, size, fetch->offset, + SM_PAGE_SIZE, buf, + peerDigestSwapInCBlock, fetch); + } else { + /* need more data, do we have space? */ + if (size >= SM_PAGE_SIZE) + peerDigestFetchFinish(fetch, buf, "too big stored header"); + else + storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf, + peerDigestSwapInHeaders, fetch); + } +} + +static void +peerDigestSwapInCBlock(void *data, char *buf, ssize_t size) +{ + DigestFetchState *fetch = data; + peer *peer = fetch->peer; + HttpReply *rep = fetch->entry->mem_obj->reply; + const int seen = fetch->offset + size; + assert(peer && buf && rep); + if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock")) + return; + if (size >= StoreDigestCBlockSize) { + if (peerDigestSetCBlock(peer, buf)) { + fetch->offset += StoreDigestCBlockSize; + storeClientCopy(fetch->entry, seen, fetch->offset, + SM_PAGE_SIZE, buf, + peerDigestSwapInMask, fetch); + } else { + peerDigestFetchFinish(fetch, buf, "invalid digest cblock"); + } + } else { + /* need more data, do we have space? */ + if (size >= SM_PAGE_SIZE) + peerDigestFetchFinish(fetch, buf, "too big cblock"); + else + storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf, + peerDigestSwapInCBlock, fetch); + } +} + +static void +peerDigestSwapInMask(void *data, char *buf, ssize_t size) +{ + DigestFetchState *fetch = data; + peer *peer = fetch->peer; + HttpReply *rep = fetch->entry->mem_obj->reply; + const int seen = fetch->offset + size; + assert(peer && buf && rep); + if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock")) + return; + if (peerDigestUpdateMask(peer, fetch->mask_offset, buf, size)) { + fetch->offset += size; + fetch->mask_offset += size; + storeClientCopy(fetch->entry, seen, fetch->offset, + SM_PAGE_SIZE, buf, + peerDigestSwapInMask, fetch); + } else { + peerDigestFetchFinish(fetch, buf, "invalid mask"); + } +} + +static int +peerDigestFetchedEnough(DigestFetchState *fetch, char *buf, ssize_t size, const char *step_name) +{ + const char *reason = NULL; + const char *no_bug = NULL; + + debug(72, 3) ("%s: %s offset: %d size: %d.\n", + step_name, fetch->peer->host, fetch->offset, size); + + /* test exiting conditions */ + if (size < 0) reason = "swap failure"; + else if (!size) reason = no_bug = "eof"; + else if (!fetch->entry) reason = "swap abort(?)"; + else if (fetch->entry->store_status == STORE_ABORTED) reason = "swap abort"; + else if (!cbdataValid(fetch->peer)) reason = "peer disappeard"; + + /* report exit reason */ + if (reason) { + debug(72, 3) ("%s: exiting on %s\n", step_name, reason); + peerDigestFetchFinish(fetch, buf, no_bug ? NULL : reason); + } + return reason != NULL; +} + +/* free state structures, disables digest on error */ +/* this probably should mimic httpRequestFree() but it does not! @?@ @?@ */ +static void +peerDigestFetchFinish(DigestFetchState *fetch, char *buf, const char *err_msg) +{ + peer *peer = fetch->peer; + MemObject *mem = fetch->entry->mem_obj; + request_t *req = mem->request; + const time_t expires = fetch->entry->expires; + const time_t fetch_resp_time = squid_curtime - fetch->start_time; + const off_t b_read = (fetch->entry->swap_status == STORE_PENDING) ? mem->inmem_hi : mem->object_sz; + /* set it here and in peerDigestRequest to protect against long downloads */ + peer->digest.last_req_timestamp = squid_curtime; + peer->digest.last_fetch_resp_time = fetch_resp_time; + if (!err_msg && !peer->digest.cd) + err_msg = "null digest (internal bug?)"; + if (!err_msg && fetch->mask_offset != peer->digest.cd->mask_size) + err_msg = "premature eof"; + if (fetch->old_entry) { + debug(72,2) ("peerDigestFetchFinish: deleting old entry\n"); + storeUnregister(fetch->old_entry, fetch); + storeReleaseRequest(fetch->old_entry); + storeUnlockObject(fetch->old_entry); + fetch->old_entry = NULL; + } + assert(NULL != fetch->entry); + if (req) { + requestUnlink(req); + } + if (err_msg) { + debug(72, 1) ("disabling corrupted (%s) digest from %s\n", + err_msg, peer->host); + if (peer->digest.cd) { + cacheDigestDestroy(peer->digest.cd); + peer->digest.cd = NULL; + } + EBIT_SET(peer->digest.flags, PD_DISABLED); + EBIT_CLR(peer->digest.flags, PD_USABLE); + /* release buggy entry */ + storeReleaseRequest(fetch->entry); + } else { + storeComplete(fetch->entry); + EBIT_SET(peer->digest.flags, PD_USABLE); + } + storeUnregister(fetch->entry, fetch); + storeUnlockObject(fetch->entry); + fetch->entry = NULL; + cbdataFree(fetch); + fetch = NULL; + memFree(MEM_4K_BUF, buf); + EBIT_CLR(peer->digest.flags, PD_REQUESTED); + kb_incr(&Counter.cd.kbytes_recv, (size_t)b_read); + Counter.cd.times_used++; + debug(72, 2) ("peerDigestFetchFinish: %s took: %d secs; expires: %s\n", + peer->host, fetch_resp_time, mkrfc1123(expires)); + /* schedule next check */ + peerDigestValidate(peer); + /* paranoid loop detection */ + assert(!EBIT_TEST(peer->digest.flags, PD_REQUESTED)); + debug(72, 3) ("peerDigestFetchFinish: %s done\n", peer->host); +} + +static int +peerDigestSetCBlock(peer *peer, const char *buf) +{ + StoreDigestCBlock cblock; + int freed_size = 0; + xmemcpy(&cblock, buf, sizeof(cblock)); + /* network -> host conversions */ + cblock.ver.current = ntohs(cblock.ver.current); + cblock.ver.required = ntohs(cblock.ver.required); + cblock.capacity = ntohl(cblock.capacity); + cblock.count = ntohl(cblock.count); + cblock.del_count = ntohl(cblock.del_count); + cblock.mask_size = ntohl(cblock.mask_size); + debug(72,2) ("got digest cblock from %s; ver: %d (req: %d)\n", + peer->host, (int)cblock.ver.current, (int)cblock.ver.required); + debug(72,2) ("\t size: %d bytes, e-cnt: %d, e-util: %d%%\n", + cblock.mask_size, cblock.count, + xpercentInt(cblock.count, cblock.capacity)); + /* check version requirements */ + if (cblock.ver.required > CacheDigestVer.current) { + debug(72,1) ("%s digest requires version %d; have: %d\n", + peer->host, cblock.ver.required, CacheDigestVer.current); + return 0; + } + /* check consistency */ + if (cblock.ver.required > cblock.ver.current || + cblock.mask_size <= 0 || cblock.capacity <= 0) { + debug(72,0) ("%s digest cblock is corrupted.\n", peer->host); + return 0; + } + /* + * no cblock bugs below this point + */ + /* check size changes */ + if (peer->digest.cd && cblock.mask_size != peer->digest.cd->mask_size) { + debug(72,2) ("%s digest changed size: %d -> %d\n", + peer->host, cblock.mask_size, peer->digest.cd->mask_size); + freed_size = peer->digest.cd->mask_size; + cacheDigestDestroy(peer->digest.cd); + peer->digest.cd = NULL; + } + if (!peer->digest.cd) { + debug(72,2) ("cloning %s digest; size: %d (%+d) bytes\n", + peer->host, cblock.mask_size, (int) (cblock.mask_size - freed_size)); + peer->digest.cd = cacheDigestSizedCreate(cblock.mask_size, cblock.capacity); + if (cblock.mask_size >= freed_size) + kb_incr(&Counter.cd.memory, cblock.mask_size - freed_size); + } + /* these assignments leave us in an inconsistent state until we finish reading the digest */ + peer->digest.cd->count = cblock.count; + peer->digest.cd->del_count = cblock.del_count; + return 1; +} + +/* updates current mask. checks for overflows */ +static int +peerDigestUpdateMask(peer *peer, int offset, const char *buf, int size) +{ + if (size) { + assert(offset >= 0); + assert(peer->digest.cd); + if (offset + size > peer->digest.cd->mask_size) { + debug(72,0) ("peerDigestUpdateMask: %s digest is larger than expected: %d > %d\n", + peer->host, offset + size, peer->digest.cd->mask_size); + return 0; + } + xmemcpy(peer->digest.cd->mask + offset, buf, size); + } + return 1; +} +