5 * DEBUG: section 90 Storage Manager Client-Side Interface
6 * AUTHOR: Duane Wessels
8 * SQUID Web Proxy Cache http://www.squid-cache.org/
9 * ----------------------------------------------------------
11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
34 * Portions copyright (c) 2003 Robert Collins <robertc@squid-cache.org>
39 #include "StoreClient.h"
41 #include "HttpReply.h"
42 #include "MemObject.h"
43 #include "StoreMeta.h"
44 #include "StoreMetaUnpacker.h"
46 #include "DelayPools.h"
48 #include "HttpRequest.h"
52 * NOTE: 'Header' refers to the swapfile metadata header.
53 * 'OBJHeader' refers to the object header, with cannonical
54 * processed object headers (which may derive from FTP/HTTP etc
56 * 'Body' refers to the swapfile body, which is the full
57 * HTTP reply (including HTTP headers and body).
59 static StoreIOState::STRCB storeClientReadBody
;
60 static StoreIOState::STRCB storeClientReadHeader
;
61 static void storeClientCopy2(StoreEntry
* e
, store_client
* sc
);
62 static EVH storeClientCopyEvent
;
63 static int CheckQuickAbort2(StoreEntry
* entry
);
64 static void CheckQuickAbort(StoreEntry
* entry
);
66 CBDATA_CLASS_INIT(store_client
);
69 store_client::operator new (size_t)
71 CBDATA_INIT_TYPE(store_client
);
72 store_client
*result
= cbdataAlloc(store_client
);
77 store_client::operator delete (void *address
)
79 store_client
*t
= static_cast<store_client
*>(address
);
84 store_client::memReaderHasLowerOffset(int64_t anOffset
) const
86 return getType() == STORE_MEM_CLIENT
&& copyInto
.offset
< anOffset
;
90 store_client::getType() const
95 #if STORE_CLIENT_LIST_DEBUG
97 storeClientListSearch(const MemObject
* mem
, void *data
)
100 store_client
*sc
= NULL
;
102 for (node
= mem
->clients
.head
; node
; node
= node
->next
) {
105 if (sc
->owner
== data
)
113 storeClientIsThisAClient(store_client
* sc
, void *someClient
)
115 return sc
->owner
== someClient
;
119 #include "HttpRequest.h"
121 /* add client with fd to client list */
123 storeClientListAdd(StoreEntry
* e
, void *data
)
125 MemObject
*mem
= e
->mem_obj
;
128 #if STORE_CLIENT_LIST_DEBUG
130 if (storeClientListSearch(mem
, data
) != NULL
)
136 sc
= new store_client (e
);
144 store_client::callback(ssize_t sz
, bool error
)
146 StoreIOBuffer
result(sz
, 0 ,copyInto
.data
);
149 result
.flags
.error
= 1;
152 result
.flags
.error
= error
? 1 : 0;
155 result
.offset
= cmp_offset
;
156 assert(_callback
.pending());
157 cmp_offset
= copyInto
.offset
+ sz
;
158 STCB
*temphandler
= _callback
.callback_handler
;
159 void *cbdata
= _callback
.callback_data
;
160 _callback
= Callback(NULL
, NULL
);
161 copyInto
.data
= NULL
;
163 if (cbdataReferenceValid(cbdata
))
164 temphandler(cbdata
, result
);
166 cbdataReferenceDone(cbdata
);
170 storeClientCopyEvent(void *data
)
172 store_client
*sc
= (store_client
*)data
;
173 debugs(90, 3, "storeClientCopyEvent: Running");
174 assert (sc
->flags
.copy_event_pending
);
175 sc
->flags
.copy_event_pending
= 0;
177 if (!sc
->_callback
.pending())
180 storeClientCopy2(sc
->entry
, sc
);
183 store_client::store_client(StoreEntry
*e
) : entry (e
)
187 , type (e
->storeClientType())
191 flags
.disk_io_pending
= 0;
194 if (getType() == STORE_DISK_CLIENT
)
195 /* assert we'll be able to get the data we want */
196 /* maybe we should open swapin_fd here */
197 assert(entry
->swap_filen
> -1 || entry
->swapOutAble());
199 #if STORE_CLIENT_LIST_DEBUG
201 owner
= cbdataReference(data
);
206 store_client::~store_client()
209 /* copy bytes requested by the client */
211 storeClientCopy(store_client
* sc
,
213 StoreIOBuffer copyInto
,
218 sc
->copy(e
, copyInto
,callback
,data
);
222 store_client::copy(StoreEntry
* anEntry
,
223 StoreIOBuffer copyRequest
,
227 assert (anEntry
== entry
);
228 assert (callback_fn
);
230 assert(!EBIT_TEST(entry
->flags
, ENTRY_ABORTED
));
231 debugs(90, 3, "store_client::copy: " << entry
->getMD5Text() << ", from " <<
232 copyRequest
.offset
<< ", for length " <<
233 (int) copyRequest
.length
<< ", cb " << callback_fn
<< ", cbdata " <<
236 #if STORE_CLIENT_LIST_DEBUG
238 assert(this == storeClientListSearch(entry
->mem_obj
, data
));
241 assert(!_callback
.pending());
242 #if ONLYCONTIGUOUSREQUESTS
244 assert(cmp_offset
== copyRequest
.offset
);
246 /* range requests will skip into the body */
247 cmp_offset
= copyRequest
.offset
;
248 _callback
= Callback (callback_fn
, cbdataReference(data
));
249 copyInto
.data
= copyRequest
.data
;
250 copyInto
.length
= copyRequest
.length
;
251 copyInto
.offset
= copyRequest
.offset
;
253 static bool copying (false);
256 PROF_start(storeClient_kickReads
);
257 /* we might be blocking comm reads due to readahead limits
258 * now we have a new offset, trigger those reads...
260 entry
->mem_obj
->kickReads();
261 PROF_stop(storeClient_kickReads
);
264 storeClientCopy2(entry
, this);
268 * This function is used below to decide if we have any more data to
269 * send to the client. If the store_status is STORE_PENDING, then we
270 * do have more data to send. If its STORE_OK, then
271 * we continue checking. If the object length is negative, then we
272 * don't know the real length and must open the swap file to find out.
273 * If the length is >= 0, then we compare it to the requested copy
277 storeClientNoMoreToSend(StoreEntry
* e
, store_client
* sc
)
281 if (e
->store_status
== STORE_PENDING
)
284 if ((len
= e
->objectLen()) < 0)
287 if (sc
->copyInto
.offset
< len
)
294 storeClientCopy2(StoreEntry
* e
, store_client
* sc
)
296 /* reentrancy not allowed - note this could lead to
300 if (sc
->flags
.copy_event_pending
) {
304 if (EBIT_TEST(e
->flags
, ENTRY_FWD_HDR_WAIT
)) {
305 debugs(90, 5, "storeClientCopy2: returning because ENTRY_FWD_HDR_WAIT set");
309 if (sc
->flags
.store_copying
) {
310 sc
->flags
.copy_event_pending
= 1;
311 debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()");
312 eventAdd("storeClientCopyEvent", storeClientCopyEvent
, sc
, 0.0, 0);
316 debugs(90, 3, "storeClientCopy2: " << e
->getMD5Text());
317 assert(sc
->_callback
.pending());
319 * We used to check for ENTRY_ABORTED here. But there were some
320 * problems. For example, we might have a slow client (or two) and
321 * the server-side is reading far ahead and swapping to disk. Even
322 * if the server-side aborts, we want to give the client(s)
323 * everything we got before the abort condition occurred.
325 /* Warning: doCopy may indirectly free itself in callbacks,
326 * hence the lock to keep it active for the duration of
329 cbdataInternalLock(sc
);
330 assert (sc
->flags
.store_copying
== 0);
332 assert (sc
->flags
.store_copying
== 0);
333 cbdataInternalUnlock(sc
);
337 store_client::doCopy(StoreEntry
*anEntry
)
339 assert (anEntry
== entry
);
340 flags
.store_copying
= 1;
341 MemObject
*mem
= entry
->mem_obj
;
343 debugs(33, 5, "store_client::doCopy: co: " <<
344 copyInto
.offset
<< ", hi: " <<
347 if (storeClientNoMoreToSend(entry
, this)) {
348 /* There is no more to send! */
349 debugs(33, 3, HERE
<< "There is no more to send!");
351 flags
.store_copying
= 0;
355 /* Check that we actually have data */
356 if (anEntry
->store_status
== STORE_PENDING
&& copyInto
.offset
>= mem
->endOffset()) {
357 debugs(90, 3, "store_client::doCopy: Waiting for more");
358 flags
.store_copying
= 0;
363 * Slight weirdness here. We open a swapin file for any
364 * STORE_DISK_CLIENT, even if we can copy the requested chunk
365 * from memory in the next block. We must try to open the
366 * swapin file before sending any data to the client side. If
367 * we postpone the open, and then can not open the file later
368 * on, the client loses big time. Its transfer just gets cut
369 * off. Better to open it early (while the client side handler
370 * is clientCacheHit) so that we can fall back to a cache miss
374 if (STORE_DISK_CLIENT
== getType() && swapin_sio
== NULL
)
381 store_client::startSwapin()
383 debugs(90, 3, "store_client::doCopy: Need to open swap in file");
384 /* gotta open the swapin file */
386 if (storeTooManyDiskFilesOpen()) {
387 /* yuck -- this causes a TCP_SWAPFAIL_MISS on the client side */
389 flags
.store_copying
= 0;
391 } else if (!flags
.disk_io_pending
) {
392 /* Don't set store_io_pending here */
393 storeSwapInStart(this);
395 if (swapin_sio
== NULL
) {
397 flags
.store_copying
= 0;
402 * If the open succeeds we either copy from memory, or
403 * schedule a disk read in the next block.
409 debugs(90, 1, "WARNING: Averted multiple fd operation (1)");
410 flags
.store_copying
= 0;
416 store_client::scheduleRead()
418 MemObject
*mem
= entry
->mem_obj
;
420 if (copyInto
.offset
>= mem
->inmem_lo
&& copyInto
.offset
< mem
->endOffset())
427 store_client::scheduleDiskRead()
429 /* What the client wants is not in memory. Schedule a disk read */
430 assert(STORE_DISK_CLIENT
== getType());
432 assert(!flags
.disk_io_pending
);
434 debugs(90, 3, "store_client::doCopy: reading from STORE");
438 flags
.store_copying
= 0;
442 store_client::scheduleMemRead()
444 /* What the client wants is in memory */
446 debugs(90, 3, "store_client::doCopy: Copying normal from memory");
447 size_t sz
= entry
->mem_obj
->data_hdr
.copy(copyInto
);
449 flags
.store_copying
= 0;
453 store_client::fileRead()
455 MemObject
*mem
= entry
->mem_obj
;
457 assert(_callback
.pending());
458 assert(!flags
.disk_io_pending
);
459 flags
.disk_io_pending
= 1;
461 if (mem
->swap_hdr_sz
!= 0)
462 if (entry
->swap_status
== SWAPOUT_WRITING
)
463 assert(mem
->swapout
.sio
->offset() > copyInto
.offset
+ (int64_t)mem
->swap_hdr_sz
);
465 storeRead(swapin_sio
,
468 copyInto
.offset
+ mem
->swap_hdr_sz
,
469 mem
->swap_hdr_sz
== 0 ? storeClientReadHeader
470 : storeClientReadBody
,
475 storeClientReadBody(void *data
, const char *buf
, ssize_t len
, StoreIOState::Pointer self
)
477 store_client
*sc
= (store_client
*)data
;
478 assert(sc
->flags
.disk_io_pending
);
479 sc
->flags
.disk_io_pending
= 0;
480 assert(sc
->_callback
.pending());
481 debugs(90, 3, "storeClientReadBody: len " << len
<< "");
483 if (sc
->copyInto
.offset
== 0 && len
> 0 && sc
->entry
->getReply()->sline
.status
== 0) {
484 /* Our structure ! */
485 HttpReply
*rep
= (HttpReply
*) sc
->entry
->getReply(); // bypass const
487 if (!rep
->parseCharBuf(sc
->copyInto
.data
, headersEnd(sc
->copyInto
.data
, len
))) {
488 debugs(90, 0, "Could not parse headers from on disk object");
499 /* synchronous open failures callback from the store,
500 * before startSwapin detects the failure.
501 * TODO: fix this inconsistent behaviour - probably by
502 * having storeSwapInStart become a callback functions,
506 if (_callback
.pending())
511 storeClientReadHeader(void *data
, const char *buf
, ssize_t len
, StoreIOState::Pointer self
)
513 store_client
*sc
= (store_client
*)data
;
514 sc
->readHeader(buf
, len
);
518 store_client::unpackHeader(char const *buf
, ssize_t len
)
520 debugs(90, 3, "store_client::unpackHeader: len " << len
<< "");
523 debugs(90, 3, "store_client::unpackHeader: " << xstrerror() << "");
529 StoreMetaUnpacker
aBuilder(buf
, len
, &swap_hdr_sz
);
531 if (!aBuilder
.isBufferSane()) {
532 /* oops, bad disk file? */
533 debugs(90, 1, "WARNING: swapfile header inconsistent with available data");
538 tlv
*tlv_list
= aBuilder
.createStoreMeta ();
540 if (tlv_list
== NULL
) {
541 debugs(90, 1, "WARNING: failed to unpack meta data");
547 * Check the meta data and make sure we got the right object.
549 for (tlv
*t
= tlv_list
; t
; t
= t
->next
) {
550 if (!t
->checkConsistency(entry
)) {
551 storeSwapTLVFree(tlv_list
);
557 storeSwapTLVFree(tlv_list
);
559 entry
->mem_obj
->swap_hdr_sz
= swap_hdr_sz
;
560 entry
->mem_obj
->object_sz
= entry
->swap_file_sz
- swap_hdr_sz
;
565 store_client::readHeader(char const *buf
, ssize_t len
)
567 MemObject
*const mem
= entry
->mem_obj
;
569 assert(flags
.disk_io_pending
);
570 flags
.disk_io_pending
= 0;
571 assert(_callback
.pending());
573 unpackHeader (buf
, len
);
579 * If our last read got some data the client wants, then give
580 * it to them, otherwise schedule another read.
582 size_t body_sz
= len
- mem
->swap_hdr_sz
;
584 if (copyInto
.offset
< static_cast<int64_t>(body_sz
)) {
586 * we have (part of) what they want
588 size_t copy_sz
= XMIN(copyInto
.length
, body_sz
);
589 debugs(90, 3, "storeClientReadHeader: copying " << copy_sz
<< " bytes of body");
590 xmemmove(copyInto
.data
, copyInto
.data
+ mem
->swap_hdr_sz
, copy_sz
);
592 if (copyInto
.offset
== 0 && len
> 0 && entry
->getReply()->sline
.status
== 0) {
593 /* Our structure ! */
594 HttpReply
*rep
= (HttpReply
*) entry
->getReply(); // bypass const
596 if (!rep
->parseCharBuf(copyInto
.data
, headersEnd(copyInto
.data
, copy_sz
))) {
597 debugs(90, 0, "could not parse headers from on disk structure!");
606 * we don't have what the client wants, but at least we now
607 * know the swap header size.
613 storeClientCopyPending(store_client
* sc
, StoreEntry
* e
, void *data
)
615 #if STORE_CLIENT_LIST_DEBUG
616 assert(sc
== storeClientListSearch(e
->mem_obj
, data
));
623 assert(sc
->entry
== e
);
631 if (!sc
->_callback
.pending())
638 * This routine hasn't been optimised to take advantage of the
642 storeUnregister(store_client
* sc
, StoreEntry
* e
, void *data
)
644 MemObject
*mem
= e
->mem_obj
;
645 #if STORE_CLIENT_LIST_DEBUG
647 assert(sc
== storeClientListSearch(e
->mem_obj
, data
));
653 debugs(90, 3, "storeUnregister: called for '" << e
->getMD5Text() << "'");
656 debugs(90, 3, "storeUnregister: No matching client for '" << e
->getMD5Text() << "'");
660 if (mem
->clientCount() == 0) {
661 debugs(90, 3, "storeUnregister: Consistency failure - store client being unregistered is not in the mem object's list for '" << e
->getMD5Text() << "'");
665 dlinkDelete(&sc
->node
, &mem
->clients
);
668 if (e
->store_status
== STORE_OK
&& e
->swap_status
!= SWAPOUT_DONE
)
671 if (sc
->swapin_sio
!= NULL
) {
672 storeClose(sc
->swapin_sio
);
673 sc
->swapin_sio
= NULL
;
674 statCounter
.swap
.ins
++;
677 if (sc
->_callback
.pending()) {
678 /* callback with ssize = -1 to indicate unexpected termination */
679 debugs(90, 3, "storeUnregister: store_client for " << mem
->url
<< " has a callback");
683 #if STORE_CLIENT_LIST_DEBUG
684 cbdataReferenceDone(sc
->owner
);
690 assert(e
->lock_count
> 0);
692 if (mem
->nclients
== 0)
700 /* Call handlers waiting for data to be appended to E. */
702 StoreEntry::invokeHandlers()
704 /* Commit what we can to disk, if appropriate */
708 dlink_node
*nx
= NULL
;
711 PROF_start(InvokeHandlers
);
713 debugs(90, 3, "InvokeHandlers: " << getMD5Text() );
714 /* walk the entire list looking for valid callbacks */
716 for (node
= mem_obj
->clients
.head
; node
; node
= nx
) {
717 sc
= (store_client
*)node
->data
;
719 debugs(90, 3, "StoreEntry::InvokeHandlers: checking client #" << i
++ );
721 if (!sc
->_callback
.pending())
724 if (sc
->flags
.disk_io_pending
)
727 storeClientCopy2(this, sc
);
729 PROF_stop(InvokeHandlers
);
733 storePendingNClients(const StoreEntry
* e
)
735 MemObject
*mem
= e
->mem_obj
;
736 int npend
= NULL
== mem
? 0 : mem
->nclients
;
737 debugs(90, 3, "storePendingNClients: returning " << npend
);
741 /* return 1 if the request should be aborted */
743 CheckQuickAbort2(StoreEntry
* entry
)
745 MemObject
* const mem
= entry
->mem_obj
;
747 debugs(90, 3, "CheckQuickAbort2: entry=" << entry
<< ", mem=" << mem
);
749 if (mem
->request
&& !mem
->request
->flags
.cachable
) {
750 debugs(90, 3, "CheckQuickAbort2: YES !mem->request->flags.cachable");
754 if (EBIT_TEST(entry
->flags
, KEY_PRIVATE
)) {
755 debugs(90, 3, "CheckQuickAbort2: YES KEY_PRIVATE");
759 int64_t expectlen
= entry
->getReply()->content_length
+ entry
->getReply()->hdr_sz
;
762 /* expectlen is < 0 if *no* information about the object has been received */
765 int64_t curlen
= mem
->endOffset ();
767 if (Config
.quickAbort
.min
< 0) {
768 debugs(90, 3, "CheckQuickAbort2: NO disabled");
772 if (curlen
> expectlen
) {
773 debugs(90, 3, "CheckQuickAbort2: YES bad content length");
777 if ((expectlen
- curlen
) < (Config
.quickAbort
.min
<< 10)) {
778 debugs(90, 3, "CheckQuickAbort2: NO only little more left");
782 if ((expectlen
- curlen
) > (Config
.quickAbort
.max
<< 10)) {
783 debugs(90, 3, "CheckQuickAbort2: YES too much left to go");
787 if (expectlen
< 100) {
788 debugs(90, 3, "CheckQuickAbort2: NO avoid FPE");
792 if ((curlen
/ (expectlen
/ 100)) > (Config
.quickAbort
.pct
)) {
793 debugs(90, 3, "CheckQuickAbort2: NO past point of no return");
797 debugs(90, 3, "CheckQuickAbort2: YES default, returning 1");
802 CheckQuickAbort(StoreEntry
* entry
)
806 if (storePendingNClients(entry
) > 0)
809 if (entry
->store_status
!= STORE_PENDING
)
812 if (EBIT_TEST(entry
->flags
, ENTRY_SPECIAL
))
815 if (CheckQuickAbort2(entry
) == 0)
822 store_client::dumpStats(MemBuf
* output
, int clientNumber
) const
824 if (_callback
.pending())
827 output
->Printf("\tClient #%d, %p\n", clientNumber
, _callback
.callback_data
);
829 output
->Printf("\t\tcopy_offset: %"PRId64
"\n",
832 output
->Printf("\t\tcopy_size: %d\n",
833 (int) copyInto
.length
);
835 output
->Printf("\t\tflags:");
837 if (flags
.disk_io_pending
)
838 output
->Printf(" disk_io_pending");
840 if (flags
.store_copying
)
841 output
->Printf(" store_copying");
843 if (flags
.copy_event_pending
)
844 output
->Printf(" copy_event_pending");
846 output
->Printf("\n");
850 store_client::Callback::pending() const
852 return callback_handler
&& callback_data
;
855 store_client::Callback::Callback(STCB
*function
, void *data
) : callback_handler(function
), callback_data (data
) {}
859 store_client::setDelayId(DelayId delay_id
)