3 * DEBUG: section 90 Storage Manager Client-Side Interface
4 * AUTHOR: Duane Wessels
6 * SQUID Web Proxy Cache http://www.squid-cache.org/
7 * ----------------------------------------------------------
9 * Squid is the result of efforts by numerous individuals from
10 * the Internet community; see the CONTRIBUTORS file for full
11 * details. Many organizations have provided support for Squid's
12 * development; see the SPONSORS file for full details. Squid is
13 * Copyrighted (C) 2001 by the Regents of the University of
14 * California; see the COPYRIGHT file for full details. Squid
15 * incorporates software developed and/or copyrighted by other
16 * sources; see the CREDITS file for full details.
18 * This program is free software; you can redistribute it and/or modify
19 * it under the terms of the GNU General Public License as published by
20 * the Free Software Foundation; either version 2 of the License, or
21 * (at your option) any later version.
23 * This program is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * GNU General Public License for more details.
28 * You should have received a copy of the GNU General Public License
29 * along with this program; if not, write to the Free Software
30 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 * Portions copyright (c) 2003 Robert Collins <robertc@squid-cache.org>
37 #include "HttpReply.h"
38 #include "HttpRequest.h"
40 #include "MemObject.h"
41 #include "mime_header.h"
42 #include "profiler/Profiler.h"
43 #include "SquidConfig.h"
44 #include "StatCounters.h"
45 #include "StoreClient.h"
47 #include "store_swapin.h"
48 #include "StoreMeta.h"
49 #include "StoreMetaUnpacker.h"
51 #include "DelayPools.h"
55 * NOTE: 'Header' refers to the swapfile metadata header.
56 * 'OBJHeader' refers to the object header, with cannonical
57 * processed object headers (which may derive from FTP/HTTP etc
59 * 'Body' refers to the swapfile body, which is the full
60 * HTTP reply (including HTTP headers and body).
62 static StoreIOState::STRCB storeClientReadBody
;
63 static StoreIOState::STRCB storeClientReadHeader
;
64 static void storeClientCopy2(StoreEntry
* e
, store_client
* sc
);
65 static EVH storeClientCopyEvent
;
66 static bool CheckQuickAbortIsReasonable(StoreEntry
* entry
);
67 static void CheckQuickAbort(StoreEntry
* entry
);
69 CBDATA_CLASS_INIT(store_client
);
72 store_client::operator new (size_t)
74 CBDATA_INIT_TYPE(store_client
);
75 store_client
*result
= cbdataAlloc(store_client
);
80 store_client::operator delete (void *address
)
82 store_client
*t
= static_cast<store_client
*>(address
);
87 store_client::memReaderHasLowerOffset(int64_t anOffset
) const
89 return getType() == STORE_MEM_CLIENT
&& copyInto
.offset
< anOffset
;
93 store_client::getType() const
98 #if STORE_CLIENT_LIST_DEBUG
100 storeClientListSearch(const MemObject
* mem
, void *data
)
103 store_client
*sc
= NULL
;
105 for (node
= mem
->clients
.head
; node
; node
= node
->next
) {
108 if (sc
->owner
== data
)
116 storeClientIsThisAClient(store_client
* sc
, void *someClient
)
118 return sc
->owner
== someClient
;
122 #include "HttpRequest.h"
124 /* add client with fd to client list */
126 storeClientListAdd(StoreEntry
* e
, void *data
)
128 MemObject
*mem
= e
->mem_obj
;
131 #if STORE_CLIENT_LIST_DEBUG
133 if (storeClientListSearch(mem
, data
) != NULL
)
139 sc
= new store_client (e
);
147 store_client::callback(ssize_t sz
, bool error
)
149 StoreIOBuffer
result(sz
, 0 ,copyInto
.data
);
152 result
.flags
.error
= 1;
155 result
.flags
.error
= error
? 1 : 0;
158 result
.offset
= cmp_offset
;
159 assert(_callback
.pending());
160 cmp_offset
= copyInto
.offset
+ sz
;
161 STCB
*temphandler
= _callback
.callback_handler
;
162 void *cbdata
= _callback
.callback_data
;
163 _callback
= Callback(NULL
, NULL
);
164 copyInto
.data
= NULL
;
166 if (cbdataReferenceValid(cbdata
))
167 temphandler(cbdata
, result
);
169 cbdataReferenceDone(cbdata
);
173 storeClientCopyEvent(void *data
)
175 store_client
*sc
= (store_client
*)data
;
176 debugs(90, 3, "storeClientCopyEvent: Running");
177 assert (sc
->flags
.copy_event_pending
);
178 sc
->flags
.copy_event_pending
= false;
180 if (!sc
->_callback
.pending())
183 storeClientCopy2(sc
->entry
, sc
);
186 store_client::store_client(StoreEntry
*e
) : entry (e
)
190 , type (e
->storeClientType())
194 flags
.disk_io_pending
= false;
197 if (getType() == STORE_DISK_CLIENT
)
198 /* assert we'll be able to get the data we want */
199 /* maybe we should open swapin_sio here */
200 assert(entry
->swap_filen
> -1 || entry
->swappingOut());
202 #if STORE_CLIENT_LIST_DEBUG
204 owner
= cbdataReference(data
);
209 store_client::~store_client()
212 /* copy bytes requested by the client */
214 storeClientCopy(store_client
* sc
,
216 StoreIOBuffer copyInto
,
221 sc
->copy(e
, copyInto
,callback
,data
);
225 store_client::copy(StoreEntry
* anEntry
,
226 StoreIOBuffer copyRequest
,
230 assert (anEntry
== entry
);
231 assert (callback_fn
);
233 assert(!EBIT_TEST(entry
->flags
, ENTRY_ABORTED
));
234 debugs(90, 3, "store_client::copy: " << entry
->getMD5Text() << ", from " <<
235 copyRequest
.offset
<< ", for length " <<
236 (int) copyRequest
.length
<< ", cb " << callback_fn
<< ", cbdata " <<
239 #if STORE_CLIENT_LIST_DEBUG
241 assert(this == storeClientListSearch(entry
->mem_obj
, data
));
244 assert(!_callback
.pending());
245 #if ONLYCONTIGUOUSREQUESTS
247 assert(cmp_offset
== copyRequest
.offset
);
249 /* range requests will skip into the body */
250 cmp_offset
= copyRequest
.offset
;
251 _callback
= Callback (callback_fn
, cbdataReference(data
));
252 copyInto
.data
= copyRequest
.data
;
253 copyInto
.length
= copyRequest
.length
;
254 copyInto
.offset
= copyRequest
.offset
;
256 static bool copying (false);
259 PROF_start(storeClient_kickReads
);
260 /* we might be blocking comm reads due to readahead limits
261 * now we have a new offset, trigger those reads...
263 entry
->mem_obj
->kickReads();
264 PROF_stop(storeClient_kickReads
);
267 anEntry
->lock("store_client::copy"); // see deletion note below
269 storeClientCopy2(entry
, this);
271 // Bug 3480: This store_client object may be deleted now if, for example,
272 // the client rejects the hit response copied above. Use on-stack pointers!
275 anEntry
->kickProducer();
277 anEntry
->unlock("store_client::copy");
281 * This function is used below to decide if we have any more data to
282 * send to the client. If the store_status is STORE_PENDING, then we
283 * do have more data to send. If its STORE_OK, then
284 * we continue checking. If the object length is negative, then we
285 * don't know the real length and must open the swap file to find out.
286 * If the length is >= 0, then we compare it to the requested copy
290 storeClientNoMoreToSend(StoreEntry
* e
, store_client
* sc
)
294 if (e
->store_status
== STORE_PENDING
)
297 if ((len
= e
->objectLen()) < 0)
300 if (sc
->copyInto
.offset
< len
)
307 storeClientCopy2(StoreEntry
* e
, store_client
* sc
)
309 /* reentrancy not allowed - note this could lead to
313 if (sc
->flags
.copy_event_pending
) {
317 if (EBIT_TEST(e
->flags
, ENTRY_FWD_HDR_WAIT
)) {
318 debugs(90, 5, "storeClientCopy2: returning because ENTRY_FWD_HDR_WAIT set");
322 if (sc
->flags
.store_copying
) {
323 sc
->flags
.copy_event_pending
= true;
324 debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()");
325 eventAdd("storeClientCopyEvent", storeClientCopyEvent
, sc
, 0.0, 0);
329 debugs(90, 3, "storeClientCopy2: " << e
->getMD5Text());
330 assert(sc
->_callback
.pending());
332 * We used to check for ENTRY_ABORTED here. But there were some
333 * problems. For example, we might have a slow client (or two) and
334 * the server-side is reading far ahead and swapping to disk. Even
335 * if the server-side aborts, we want to give the client(s)
336 * everything we got before the abort condition occurred.
338 /* Warning: doCopy may indirectly free itself in callbacks,
339 * hence the lock to keep it active for the duration of
342 cbdataInternalLock(sc
);
343 assert (!sc
->flags
.store_copying
);
345 assert (!sc
->flags
.store_copying
);
346 cbdataInternalUnlock(sc
);
350 store_client::doCopy(StoreEntry
*anEntry
)
352 assert (anEntry
== entry
);
353 flags
.store_copying
= true;
354 MemObject
*mem
= entry
->mem_obj
;
356 debugs(33, 5, "store_client::doCopy: co: " <<
357 copyInto
.offset
<< ", hi: " <<
360 if (storeClientNoMoreToSend(entry
, this)) {
361 /* There is no more to send! */
362 debugs(33, 3, HERE
<< "There is no more to send!");
364 flags
.store_copying
= false;
368 /* Check that we actually have data */
369 if (anEntry
->store_status
== STORE_PENDING
&& copyInto
.offset
>= mem
->endOffset()) {
370 debugs(90, 3, "store_client::doCopy: Waiting for more");
371 flags
.store_copying
= false;
376 * Slight weirdness here. We open a swapin file for any
377 * STORE_DISK_CLIENT, even if we can copy the requested chunk
378 * from memory in the next block. We must try to open the
379 * swapin file before sending any data to the client side. If
380 * we postpone the open, and then can not open the file later
381 * on, the client loses big time. Its transfer just gets cut
382 * off. Better to open it early (while the client side handler
383 * is clientCacheHit) so that we can fall back to a cache miss
387 if (STORE_DISK_CLIENT
== getType() && swapin_sio
== NULL
)
394 store_client::startSwapin()
396 debugs(90, 3, "store_client::doCopy: Need to open swap in file");
397 /* gotta open the swapin file */
399 if (storeTooManyDiskFilesOpen()) {
400 /* yuck -- this causes a TCP_SWAPFAIL_MISS on the client side */
402 flags
.store_copying
= false;
404 } else if (!flags
.disk_io_pending
) {
405 /* Don't set store_io_pending here */
406 storeSwapInStart(this);
408 if (swapin_sio
== NULL
) {
410 flags
.store_copying
= false;
415 * If the open succeeds we either copy from memory, or
416 * schedule a disk read in the next block.
422 debugs(90, DBG_IMPORTANT
, "WARNING: Averted multiple fd operation (1)");
423 flags
.store_copying
= false;
429 store_client::scheduleRead()
431 MemObject
*mem
= entry
->mem_obj
;
433 if (copyInto
.offset
>= mem
->inmem_lo
&& copyInto
.offset
< mem
->endOffset())
440 store_client::scheduleDiskRead()
442 /* What the client wants is not in memory. Schedule a disk read */
443 assert(STORE_DISK_CLIENT
== getType());
445 assert(!flags
.disk_io_pending
);
447 debugs(90, 3, "store_client::doCopy: reading from STORE");
451 flags
.store_copying
= false;
455 store_client::scheduleMemRead()
457 /* What the client wants is in memory */
459 debugs(90, 3, "store_client::doCopy: Copying normal from memory");
460 size_t sz
= entry
->mem_obj
->data_hdr
.copy(copyInto
);
462 flags
.store_copying
= false;
466 store_client::fileRead()
468 MemObject
*mem
= entry
->mem_obj
;
470 assert(_callback
.pending());
471 assert(!flags
.disk_io_pending
);
472 flags
.disk_io_pending
= true;
474 if (mem
->swap_hdr_sz
!= 0)
475 if (entry
->swap_status
== SWAPOUT_WRITING
)
476 assert(mem
->swapout
.sio
->offset() > copyInto
.offset
+ (int64_t)mem
->swap_hdr_sz
);
478 storeRead(swapin_sio
,
481 copyInto
.offset
+ mem
->swap_hdr_sz
,
482 mem
->swap_hdr_sz
== 0 ? storeClientReadHeader
483 : storeClientReadBody
,
488 store_client::readBody(const char *buf
, ssize_t len
)
490 int parsed_header
= 0;
492 // Don't assert disk_io_pending here.. may be called by read_header
493 flags
.disk_io_pending
= false;
494 assert(_callback
.pending());
495 debugs(90, 3, "storeClientReadBody: len " << len
<< "");
497 if (copyInto
.offset
== 0 && len
> 0 && entry
->getReply()->sline
.status() == Http::scNone
) {
498 /* Our structure ! */
499 HttpReply
*rep
= (HttpReply
*) entry
->getReply(); // bypass const
501 if (!rep
->parseCharBuf(copyInto
.data
, headersEnd(copyInto
.data
, len
))) {
502 debugs(90, DBG_CRITICAL
, "Could not parse headers from on disk object");
508 const HttpReply
*rep
= entry
->getReply();
509 if (len
> 0 && rep
&& entry
->mem_obj
->inmem_lo
== 0 && entry
->objectLen() <= (int64_t)Config
.Store
.maxInMemObjSize
&& Config
.onoff
.memory_cache_disk
) {
510 storeGetMemSpace(len
);
511 // The above may start to free our object so we need to check again
512 if (entry
->mem_obj
->inmem_lo
== 0) {
513 /* Copy read data back into memory.
514 * copyInto.offset includes headers, which is what mem cache needs
516 int64_t mem_offset
= entry
->mem_obj
->endOffset();
517 if ((copyInto
.offset
== mem_offset
) || (parsed_header
&& mem_offset
== rep
->hdr_sz
)) {
518 entry
->mem_obj
->write(StoreIOBuffer(len
, copyInto
.offset
, copyInto
.data
));
530 /* synchronous open failures callback from the store,
531 * before startSwapin detects the failure.
532 * TODO: fix this inconsistent behaviour - probably by
533 * having storeSwapInStart become a callback functions,
537 if (_callback
.pending())
542 storeClientReadHeader(void *data
, const char *buf
, ssize_t len
, StoreIOState::Pointer self
)
544 store_client
*sc
= (store_client
*)data
;
545 sc
->readHeader(buf
, len
);
549 storeClientReadBody(void *data
, const char *buf
, ssize_t len
, StoreIOState::Pointer self
)
551 store_client
*sc
= (store_client
*)data
;
552 sc
->readBody(buf
, len
);
556 store_client::unpackHeader(char const *buf
, ssize_t len
)
558 debugs(90, 3, "store_client::unpackHeader: len " << len
<< "");
561 debugs(90, 3, "store_client::unpackHeader: " << xstrerror() << "");
567 StoreMetaUnpacker
aBuilder(buf
, len
, &swap_hdr_sz
);
569 if (!aBuilder
.isBufferSane()) {
570 /* oops, bad disk file? */
571 debugs(90, DBG_IMPORTANT
, "WARNING: swapfile header inconsistent with available data");
576 tlv
*tlv_list
= aBuilder
.createStoreMeta ();
578 if (tlv_list
== NULL
) {
579 debugs(90, DBG_IMPORTANT
, "WARNING: failed to unpack meta data");
585 * Check the meta data and make sure we got the right object.
587 for (tlv
*t
= tlv_list
; t
; t
= t
->next
) {
588 if (!t
->checkConsistency(entry
)) {
589 storeSwapTLVFree(tlv_list
);
595 storeSwapTLVFree(tlv_list
);
597 assert(swap_hdr_sz
>= 0);
598 assert(entry
->swap_file_sz
> 0);
599 assert(entry
->swap_file_sz
>= static_cast<uint64_t>(swap_hdr_sz
));
600 entry
->mem_obj
->swap_hdr_sz
= swap_hdr_sz
;
601 entry
->mem_obj
->object_sz
= entry
->swap_file_sz
- swap_hdr_sz
;
602 debugs(90, 5, "store_client::unpackHeader: swap_file_sz=" <<
603 entry
->swap_file_sz
<< "( " << swap_hdr_sz
<< " + " <<
604 entry
->mem_obj
->object_sz
<< ")");
608 store_client::readHeader(char const *buf
, ssize_t len
)
610 MemObject
*const mem
= entry
->mem_obj
;
612 assert(flags
.disk_io_pending
);
613 flags
.disk_io_pending
= false;
614 assert(_callback
.pending());
616 unpackHeader (buf
, len
);
622 * If our last read got some data the client wants, then give
623 * it to them, otherwise schedule another read.
625 size_t body_sz
= len
- mem
->swap_hdr_sz
;
627 if (copyInto
.offset
< static_cast<int64_t>(body_sz
)) {
629 * we have (part of) what they want
631 size_t copy_sz
= min(copyInto
.length
, body_sz
);
632 debugs(90, 3, "storeClientReadHeader: copying " << copy_sz
<< " bytes of body");
633 memmove(copyInto
.data
, copyInto
.data
+ mem
->swap_hdr_sz
, copy_sz
);
635 readBody(copyInto
.data
, copy_sz
);
641 * we don't have what the client wants, but at least we now
642 * know the swap header size.
648 storeClientCopyPending(store_client
* sc
, StoreEntry
* e
, void *data
)
650 #if STORE_CLIENT_LIST_DEBUG
651 assert(sc
== storeClientListSearch(e
->mem_obj
, data
));
658 assert(sc
->entry
== e
);
666 if (!sc
->_callback
.pending())
673 * This routine hasn't been optimised to take advantage of the
677 storeUnregister(store_client
* sc
, StoreEntry
* e
, void *data
)
679 MemObject
*mem
= e
->mem_obj
;
680 #if STORE_CLIENT_LIST_DEBUG
682 assert(sc
== storeClientListSearch(e
->mem_obj
, data
));
688 debugs(90, 3, "storeUnregister: called for '" << e
->getMD5Text() << "'");
691 debugs(90, 3, "storeUnregister: No matching client for '" << e
->getMD5Text() << "'");
695 if (mem
->clientCount() == 0) {
696 debugs(90, 3, "storeUnregister: Consistency failure - store client being unregistered is not in the mem object's list for '" << e
->getMD5Text() << "'");
700 dlinkDelete(&sc
->node
, &mem
->clients
);
703 if (e
->store_status
== STORE_OK
&& e
->swap_status
!= SWAPOUT_DONE
)
706 if (sc
->swapin_sio
!= NULL
) {
707 storeClose(sc
->swapin_sio
, StoreIOState::readerDone
);
708 sc
->swapin_sio
= NULL
;
709 ++statCounter
.swap
.ins
;
712 if (sc
->_callback
.pending()) {
713 /* callback with ssize = -1 to indicate unexpected termination */
714 debugs(90, 3, "storeUnregister: store_client for " << mem
->url
<< " has a callback");
718 #if STORE_CLIENT_LIST_DEBUG
719 cbdataReferenceDone(sc
->owner
);
725 assert(e
->lock_count
> 0);
727 if (mem
->nclients
== 0)
739 /* Call handlers waiting for data to be appended to E. */
741 StoreEntry::invokeHandlers()
743 /* Commit what we can to disk, if appropriate */
747 dlink_node
*nx
= NULL
;
750 PROF_start(InvokeHandlers
);
752 debugs(90, 3, "InvokeHandlers: " << getMD5Text() );
753 /* walk the entire list looking for valid callbacks */
755 for (node
= mem_obj
->clients
.head
; node
; node
= nx
) {
756 sc
= (store_client
*)node
->data
;
758 debugs(90, 3, "StoreEntry::InvokeHandlers: checking client #" << i
);
761 if (!sc
->_callback
.pending())
764 if (sc
->flags
.disk_io_pending
)
767 storeClientCopy2(this, sc
);
769 PROF_stop(InvokeHandlers
);
772 // XXX: Does not account for remote readers of local writers, causing
773 // premature StoreEntry aborts.
775 storePendingNClients(const StoreEntry
* e
)
777 MemObject
*mem
= e
->mem_obj
;
778 int npend
= NULL
== mem
? 0 : mem
->nclients
;
779 debugs(90, 3, "storePendingNClients: returning " << npend
);
783 /* return true if the request should be aborted */
785 CheckQuickAbortIsReasonable(StoreEntry
* entry
)
787 MemObject
* const mem
= entry
->mem_obj
;
789 debugs(90, 3, "entry=" << entry
<< ", mem=" << mem
);
791 if (mem
->request
&& !mem
->request
->flags
.cachable
) {
792 debugs(90, 3, "quick-abort? YES !mem->request->flags.cachable");
796 if (EBIT_TEST(entry
->flags
, KEY_PRIVATE
)) {
797 debugs(90, 3, "quick-abort? YES KEY_PRIVATE");
801 int64_t expectlen
= entry
->getReply()->content_length
+ entry
->getReply()->hdr_sz
;
804 /* expectlen is < 0 if *no* information about the object has been received */
805 debugs(90, 3, "quick-abort? YES no object data received yet");
809 int64_t curlen
= mem
->endOffset();
811 if (Config
.quickAbort
.min
< 0) {
812 debugs(90, 3, "quick-abort? NO disabled");
816 if (mem
->request
&& mem
->request
->range
&& mem
->request
->getRangeOffsetLimit() < 0) {
817 /* Don't abort if the admin has configured range_ofset -1 to download fully for caching. */
818 debugs(90, 3, "quick-abort? NO admin configured range replies to full-download");
822 if (curlen
> expectlen
) {
823 debugs(90, 3, "quick-abort? YES bad content length");
827 if ((expectlen
- curlen
) < (Config
.quickAbort
.min
<< 10)) {
828 debugs(90, 3, "quick-abort? NO only a little more object left to receive");
832 if ((expectlen
- curlen
) > (Config
.quickAbort
.max
<< 10)) {
833 debugs(90, 3, "quick-abort? YES too much left to go");
837 if (expectlen
< 100) {
838 debugs(90, 3, "quick-abort? NO avoid FPE");
842 if ((curlen
/ (expectlen
/ 100)) > (Config
.quickAbort
.pct
)) {
843 debugs(90, 3, "quick-abort? NO past point of no return");
847 debugs(90, 3, "quick-abort? YES default");
852 CheckQuickAbort(StoreEntry
* entry
)
856 if (storePendingNClients(entry
) > 0)
859 if (entry
->store_status
!= STORE_PENDING
)
862 if (EBIT_TEST(entry
->flags
, ENTRY_SPECIAL
))
865 if (!CheckQuickAbortIsReasonable(entry
))
872 store_client::dumpStats(MemBuf
* output
, int clientNumber
) const
874 if (_callback
.pending())
877 output
->Printf("\tClient #%d, %p\n", clientNumber
, _callback
.callback_data
);
879 output
->Printf("\t\tcopy_offset: %" PRId64
"\n",
882 output
->Printf("\t\tcopy_size: %d\n",
883 (int) copyInto
.length
);
885 output
->Printf("\t\tflags:");
887 if (flags
.disk_io_pending
)
888 output
->Printf(" disk_io_pending");
890 if (flags
.store_copying
)
891 output
->Printf(" store_copying");
893 if (flags
.copy_event_pending
)
894 output
->Printf(" copy_event_pending");
896 output
->Printf("\n");
900 store_client::Callback::pending() const
902 return callback_handler
&& callback_data
;
905 store_client::Callback::Callback(STCB
*function
, void *data
) : callback_handler(function
), callback_data (data
) {}
909 store_client::setDelayId(DelayId delay_id
)