2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 90 Storage Manager Client-Side Interface */
14 #include "HttpReply.h"
15 #include "HttpRequest.h"
17 #include "MemObject.h"
18 #include "mime_header.h"
19 #include "profiler/Profiler.h"
20 #include "SquidConfig.h"
21 #include "StatCounters.h"
23 #include "store_swapin.h"
24 #include "StoreClient.h"
25 #include "StoreMeta.h"
26 #include "StoreMetaUnpacker.h"
28 #include "DelayPools.h"
32 * NOTE: 'Header' refers to the swapfile metadata header.
33 * 'OBJHeader' refers to the object header, with cannonical
34 * processed object headers (which may derive from FTP/HTTP etc
36 * 'Body' refers to the swapfile body, which is the full
37 * HTTP reply (including HTTP headers and body).
39 static StoreIOState::STRCB storeClientReadBody
;
40 static StoreIOState::STRCB storeClientReadHeader
;
41 static void storeClientCopy2(StoreEntry
* e
, store_client
* sc
);
42 static EVH storeClientCopyEvent
;
43 static bool CheckQuickAbortIsReasonable(StoreEntry
* entry
);
45 CBDATA_CLASS_INIT(store_client
);
48 store_client::memReaderHasLowerOffset(int64_t anOffset
) const
50 return getType() == STORE_MEM_CLIENT
&& copyInto
.offset
< anOffset
;
54 store_client::getType() const
59 #if STORE_CLIENT_LIST_DEBUG
61 storeClientListSearch(const MemObject
* mem
, void *data
)
64 store_client
*sc
= NULL
;
66 for (node
= mem
->clients
.head
; node
; node
= node
->next
) {
69 if (sc
->owner
== data
)
77 storeClientIsThisAClient(store_client
* sc
, void *someClient
)
79 return sc
->owner
== someClient
;
83 #include "HttpRequest.h"
85 /* add client with fd to client list */
87 storeClientListAdd(StoreEntry
* e
, void *data
)
89 MemObject
*mem
= e
->mem_obj
;
92 #if STORE_CLIENT_LIST_DEBUG
94 if (storeClientListSearch(mem
, data
) != NULL
)
100 sc
= new store_client (e
);
108 store_client::callback(ssize_t sz
, bool error
)
112 if (sz
>= 0 && !error
)
115 StoreIOBuffer
result(bSz
, 0 ,copyInto
.data
);
118 result
.flags
.error
= 1;
120 result
.offset
= cmp_offset
;
121 assert(_callback
.pending());
122 cmp_offset
= copyInto
.offset
+ bSz
;
123 STCB
*temphandler
= _callback
.callback_handler
;
124 void *cbdata
= _callback
.callback_data
;
125 _callback
= Callback(NULL
, NULL
);
126 copyInto
.data
= NULL
;
128 if (cbdataReferenceValid(cbdata
))
129 temphandler(cbdata
, result
);
131 cbdataReferenceDone(cbdata
);
135 storeClientCopyEvent(void *data
)
137 store_client
*sc
= (store_client
*)data
;
138 debugs(90, 3, "storeClientCopyEvent: Running");
139 assert (sc
->flags
.copy_event_pending
);
140 sc
->flags
.copy_event_pending
= false;
142 if (!sc
->_callback
.pending())
145 storeClientCopy2(sc
->entry
, sc
);
148 store_client::store_client(StoreEntry
*e
) :
150 #if STORE_CLIENT_LIST_DEBUG
151 owner(cbdataReference(data
)),
154 type(e
->storeClientType()),
157 flags
.disk_io_pending
= false;
158 flags
.store_copying
= false;
159 flags
.copy_event_pending
= false;
162 if (getType() == STORE_DISK_CLIENT
) {
163 /* assert we'll be able to get the data we want */
164 /* maybe we should open swapin_sio here */
165 assert(entry
->swap_filen
> -1 || entry
->swappingOut());
169 store_client::~store_client()
172 /* copy bytes requested by the client */
174 storeClientCopy(store_client
* sc
,
176 StoreIOBuffer copyInto
,
181 sc
->copy(e
, copyInto
,callback
,data
);
185 store_client::copy(StoreEntry
* anEntry
,
186 StoreIOBuffer copyRequest
,
190 assert (anEntry
== entry
);
191 assert (callback_fn
);
193 assert(!EBIT_TEST(entry
->flags
, ENTRY_ABORTED
));
194 debugs(90, 3, "store_client::copy: " << entry
->getMD5Text() << ", from " <<
195 copyRequest
.offset
<< ", for length " <<
196 (int) copyRequest
.length
<< ", cb " << callback_fn
<< ", cbdata " <<
199 #if STORE_CLIENT_LIST_DEBUG
201 assert(this == storeClientListSearch(entry
->mem_obj
, data
));
204 assert(!_callback
.pending());
205 #if ONLYCONTIGUOUSREQUESTS
207 assert(cmp_offset
== copyRequest
.offset
);
209 /* range requests will skip into the body */
210 cmp_offset
= copyRequest
.offset
;
211 _callback
= Callback (callback_fn
, cbdataReference(data
));
212 copyInto
.data
= copyRequest
.data
;
213 copyInto
.length
= copyRequest
.length
;
214 copyInto
.offset
= copyRequest
.offset
;
216 static bool copying (false);
219 PROF_start(storeClient_kickReads
);
220 /* we might be blocking comm reads due to readahead limits
221 * now we have a new offset, trigger those reads...
223 entry
->mem_obj
->kickReads();
224 PROF_stop(storeClient_kickReads
);
227 anEntry
->lock("store_client::copy"); // see deletion note below
229 storeClientCopy2(entry
, this);
231 // Bug 3480: This store_client object may be deleted now if, for example,
232 // the client rejects the hit response copied above. Use on-stack pointers!
235 anEntry
->kickProducer();
237 anEntry
->unlock("store_client::copy");
239 // Add no code here. This object may no longer exist.
242 /// Whether there is (or will be) more entry data for us.
244 store_client::moreToSend() const
246 if (entry
->store_status
== STORE_PENDING
)
247 return true; // there may be more coming
249 /* STORE_OK, including aborted entries: no more data is coming */
251 const int64_t len
= entry
->objectLen();
253 // If we do not know the entry length, then we have to open the swap file.
254 const bool canSwapIn
= entry
->swap_filen
>= 0;
258 if (copyInto
.offset
>= len
)
259 return false; // sent everything there is
262 return true; // if we lack prefix, we can swap it in
264 // If we cannot swap in, make sure we have what we want in RAM. Otherwise,
265 // scheduleRead calls scheduleDiskRead which asserts without a swap file.
266 const MemObject
*mem
= entry
->mem_obj
;
268 mem
->inmem_lo
<= copyInto
.offset
&& copyInto
.offset
< mem
->endOffset();
272 storeClientCopy2(StoreEntry
* e
, store_client
* sc
)
274 /* reentrancy not allowed - note this could lead to
278 if (sc
->flags
.copy_event_pending
) {
282 if (EBIT_TEST(e
->flags
, ENTRY_FWD_HDR_WAIT
)) {
283 debugs(90, 5, "storeClientCopy2: returning because ENTRY_FWD_HDR_WAIT set");
287 if (sc
->flags
.store_copying
) {
288 sc
->flags
.copy_event_pending
= true;
289 debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()");
290 eventAdd("storeClientCopyEvent", storeClientCopyEvent
, sc
, 0.0, 0);
294 debugs(90, 3, "storeClientCopy2: " << e
->getMD5Text());
295 assert(sc
->_callback
.pending());
297 * We used to check for ENTRY_ABORTED here. But there were some
298 * problems. For example, we might have a slow client (or two) and
299 * the peer server is reading far ahead and swapping to disk. Even
300 * if the peer aborts, we want to give the client(s)
301 * everything we got before the abort condition occurred.
303 /* Warning: doCopy may indirectly free itself in callbacks,
304 * hence the lock to keep it active for the duration of
306 * XXX: Locking does not prevent calling sc destructor (it only prevents
307 * freeing sc memory) so sc may become invalid from C++ p.o.v.
309 CbcPointer
<store_client
> tmpLock
= sc
;
310 assert (!sc
->flags
.store_copying
);
312 assert(!sc
->flags
.store_copying
);
316 store_client::doCopy(StoreEntry
*anEntry
)
318 assert (anEntry
== entry
);
319 flags
.store_copying
= true;
320 MemObject
*mem
= entry
->mem_obj
;
322 debugs(33, 5, "store_client::doCopy: co: " <<
323 copyInto
.offset
<< ", hi: " <<
327 /* There is no more to send! */
328 debugs(33, 3, HERE
<< "There is no more to send!");
330 flags
.store_copying
= false;
334 /* Check that we actually have data */
335 if (anEntry
->store_status
== STORE_PENDING
&& copyInto
.offset
>= mem
->endOffset()) {
336 debugs(90, 3, "store_client::doCopy: Waiting for more");
337 flags
.store_copying
= false;
342 * Slight weirdness here. We open a swapin file for any
343 * STORE_DISK_CLIENT, even if we can copy the requested chunk
344 * from memory in the next block. We must try to open the
345 * swapin file before sending any data to the client side. If
346 * we postpone the open, and then can not open the file later
347 * on, the client loses big time. Its transfer just gets cut
348 * off. Better to open it early (while the client side handler
349 * is clientCacheHit) so that we can fall back to a cache miss
353 if (STORE_DISK_CLIENT
== getType() && swapin_sio
== NULL
) {
360 /// opens the swapin "file" if possible; otherwise, fail()s and returns false
362 store_client::startSwapin()
364 debugs(90, 3, "store_client::doCopy: Need to open swap in file");
365 /* gotta open the swapin file */
367 if (storeTooManyDiskFilesOpen()) {
368 /* yuck -- this causes a TCP_SWAPFAIL_MISS on the client side */
370 flags
.store_copying
= false;
372 } else if (!flags
.disk_io_pending
) {
373 /* Don't set store_io_pending here */
374 storeSwapInStart(this);
376 if (swapin_sio
== NULL
) {
378 flags
.store_copying
= false;
384 debugs(90, DBG_IMPORTANT
, "WARNING: Averted multiple fd operation (1)");
385 flags
.store_copying
= false;
391 store_client::scheduleRead()
393 MemObject
*mem
= entry
->mem_obj
;
395 if (copyInto
.offset
>= mem
->inmem_lo
&& copyInto
.offset
< mem
->endOffset())
402 store_client::scheduleDiskRead()
404 /* What the client wants is not in memory. Schedule a disk read */
405 if (getType() == STORE_DISK_CLIENT
) {
406 // we should have called startSwapin() already
407 assert(swapin_sio
!= NULL
);
408 } else if (!swapin_sio
&& !startSwapin()) {
409 debugs(90, 3, "bailing after swapin start failure for " << *entry
);
410 assert(!flags
.store_copying
);
414 assert(!flags
.disk_io_pending
);
416 debugs(90, 3, "reading " << *entry
<< " from disk");
420 flags
.store_copying
= false;
424 store_client::scheduleMemRead()
426 /* What the client wants is in memory */
428 debugs(90, 3, "store_client::doCopy: Copying normal from memory");
429 size_t sz
= entry
->mem_obj
->data_hdr
.copy(copyInto
);
431 flags
.store_copying
= false;
435 store_client::fileRead()
437 MemObject
*mem
= entry
->mem_obj
;
439 assert(_callback
.pending());
440 assert(!flags
.disk_io_pending
);
441 flags
.disk_io_pending
= true;
443 if (mem
->swap_hdr_sz
!= 0)
444 if (entry
->swap_status
== SWAPOUT_WRITING
)
445 assert(mem
->swapout
.sio
->offset() > copyInto
.offset
+ (int64_t)mem
->swap_hdr_sz
);
447 storeRead(swapin_sio
,
450 copyInto
.offset
+ mem
->swap_hdr_sz
,
451 mem
->swap_hdr_sz
== 0 ? storeClientReadHeader
452 : storeClientReadBody
,
457 store_client::readBody(const char *, ssize_t len
)
459 int parsed_header
= 0;
461 // Don't assert disk_io_pending here.. may be called by read_header
462 flags
.disk_io_pending
= false;
463 assert(_callback
.pending());
464 debugs(90, 3, "storeClientReadBody: len " << len
<< "");
466 if (copyInto
.offset
== 0 && len
> 0 && entry
->getReply()->sline
.status() == Http::scNone
) {
467 /* Our structure ! */
468 HttpReply
*rep
= (HttpReply
*) entry
->getReply(); // bypass const
470 if (!rep
->parseCharBuf(copyInto
.data
, headersEnd(copyInto
.data
, len
))) {
471 debugs(90, DBG_CRITICAL
, "Could not parse headers from on disk object");
477 const HttpReply
*rep
= entry
->getReply();
478 if (len
> 0 && rep
&& entry
->mem_obj
->inmem_lo
== 0 && entry
->objectLen() <= (int64_t)Config
.Store
.maxInMemObjSize
&& Config
.onoff
.memory_cache_disk
) {
479 storeGetMemSpace(len
);
480 // The above may start to free our object so we need to check again
481 if (entry
->mem_obj
->inmem_lo
== 0) {
482 /* Copy read data back into memory.
483 * copyInto.offset includes headers, which is what mem cache needs
485 int64_t mem_offset
= entry
->mem_obj
->endOffset();
486 if ((copyInto
.offset
== mem_offset
) || (parsed_header
&& mem_offset
== rep
->hdr_sz
)) {
487 entry
->mem_obj
->write(StoreIOBuffer(len
, copyInto
.offset
, copyInto
.data
));
499 /* synchronous open failures callback from the store,
500 * before startSwapin detects the failure.
501 * TODO: fix this inconsistent behaviour - probably by
502 * having storeSwapInStart become a callback functions,
506 if (_callback
.pending())
511 storeClientReadHeader(void *data
, const char *buf
, ssize_t len
, StoreIOState::Pointer
)
513 store_client
*sc
= (store_client
*)data
;
514 sc
->readHeader(buf
, len
);
518 storeClientReadBody(void *data
, const char *buf
, ssize_t len
, StoreIOState::Pointer
)
520 store_client
*sc
= (store_client
*)data
;
521 sc
->readBody(buf
, len
);
525 store_client::unpackHeader(char const *buf
, ssize_t len
)
527 int xerrno
= errno
; // FIXME: where does errno come from?
528 debugs(90, 3, "store_client::unpackHeader: len " << len
<< "");
531 debugs(90, 3, "WARNING: unpack error: " << xstrerr(xerrno
));
536 StoreMetaUnpacker
aBuilder(buf
, len
, &swap_hdr_sz
);
538 if (!aBuilder
.isBufferSane()) {
539 /* oops, bad disk file? */
540 debugs(90, DBG_IMPORTANT
, "WARNING: swapfile header inconsistent with available data");
544 tlv
*tlv_list
= aBuilder
.createStoreMeta ();
546 if (tlv_list
== NULL
) {
547 debugs(90, DBG_IMPORTANT
, "WARNING: failed to unpack meta data");
552 * Check the meta data and make sure we got the right object.
554 for (tlv
*t
= tlv_list
; t
; t
= t
->next
) {
555 if (!t
->checkConsistency(entry
)) {
556 storeSwapTLVFree(tlv_list
);
561 storeSwapTLVFree(tlv_list
);
563 assert(swap_hdr_sz
>= 0);
564 entry
->mem_obj
->swap_hdr_sz
= swap_hdr_sz
;
565 if (entry
->swap_file_sz
> 0) { // collapsed hits may not know swap_file_sz
566 assert(entry
->swap_file_sz
>= static_cast<uint64_t>(swap_hdr_sz
));
567 entry
->mem_obj
->object_sz
= entry
->swap_file_sz
- swap_hdr_sz
;
569 debugs(90, 5, "store_client::unpackHeader: swap_file_sz=" <<
570 entry
->swap_file_sz
<< "( " << swap_hdr_sz
<< " + " <<
571 entry
->mem_obj
->object_sz
<< ")");
576 store_client::readHeader(char const *buf
, ssize_t len
)
578 MemObject
*const mem
= entry
->mem_obj
;
580 assert(flags
.disk_io_pending
);
581 flags
.disk_io_pending
= false;
582 assert(_callback
.pending());
584 // abort if we fail()'d earlier
588 if (!unpackHeader(buf
, len
)) {
594 * If our last read got some data the client wants, then give
595 * it to them, otherwise schedule another read.
597 size_t body_sz
= len
- mem
->swap_hdr_sz
;
599 if (copyInto
.offset
< static_cast<int64_t>(body_sz
)) {
601 * we have (part of) what they want
603 size_t copy_sz
= min(copyInto
.length
, body_sz
);
604 debugs(90, 3, "storeClientReadHeader: copying " << copy_sz
<< " bytes of body");
605 memmove(copyInto
.data
, copyInto
.data
+ mem
->swap_hdr_sz
, copy_sz
);
607 readBody(copyInto
.data
, copy_sz
);
613 * we don't have what the client wants, but at least we now
614 * know the swap header size.
620 storeClientCopyPending(store_client
* sc
, StoreEntry
* e
, void *data
)
622 #if STORE_CLIENT_LIST_DEBUG
623 assert(sc
== storeClientListSearch(e
->mem_obj
, data
));
630 assert(sc
->entry
== e
);
638 if (!sc
->_callback
.pending())
645 * This routine hasn't been optimised to take advantage of the
649 storeUnregister(store_client
* sc
, StoreEntry
* e
, void *data
)
651 MemObject
*mem
= e
->mem_obj
;
652 #if STORE_CLIENT_LIST_DEBUG
654 assert(sc
== storeClientListSearch(e
->mem_obj
, data
));
660 debugs(90, 3, "storeUnregister: called for '" << e
->getMD5Text() << "'");
663 debugs(90, 3, "storeUnregister: No matching client for '" << e
->getMD5Text() << "'");
667 if (mem
->clientCount() == 0) {
668 debugs(90, 3, "storeUnregister: Consistency failure - store client being unregistered is not in the mem object's list for '" << e
->getMD5Text() << "'");
672 dlinkDelete(&sc
->node
, &mem
->clients
);
675 if (e
->store_status
== STORE_OK
&& e
->swap_status
!= SWAPOUT_DONE
)
678 if (sc
->swapin_sio
!= NULL
) {
679 storeClose(sc
->swapin_sio
, StoreIOState::readerDone
);
680 sc
->swapin_sio
= NULL
;
681 ++statCounter
.swap
.ins
;
684 if (sc
->_callback
.pending()) {
685 /* callback with ssize = -1 to indicate unexpected termination */
686 debugs(90, 3, "store_client for " << *e
<< " has a callback");
690 #if STORE_CLIENT_LIST_DEBUG
691 cbdataReferenceDone(sc
->owner
);
698 // An entry locked by others may be unlocked (and destructed) by others, so
699 // we must lock again to safely dereference e after CheckQuickAbortIsReasonable().
700 e
->lock("storeUnregister");
702 if (CheckQuickAbortIsReasonable(e
))
711 e
->unlock("storeUnregister");
715 /* Call handlers waiting for data to be appended to E. */
717 StoreEntry::invokeHandlers()
719 /* Commit what we can to disk, if appropriate */
723 dlink_node
*nx
= NULL
;
726 PROF_start(InvokeHandlers
);
728 debugs(90, 3, "InvokeHandlers: " << getMD5Text() );
729 /* walk the entire list looking for valid callbacks */
731 for (node
= mem_obj
->clients
.head
; node
; node
= nx
) {
732 sc
= (store_client
*)node
->data
;
734 debugs(90, 3, "StoreEntry::InvokeHandlers: checking client #" << i
);
737 if (!sc
->_callback
.pending())
740 if (sc
->flags
.disk_io_pending
)
743 storeClientCopy2(this, sc
);
745 PROF_stop(InvokeHandlers
);
748 // Does not account for remote readers/clients.
750 storePendingNClients(const StoreEntry
* e
)
752 MemObject
*mem
= e
->mem_obj
;
753 int npend
= NULL
== mem
? 0 : mem
->nclients
;
754 debugs(90, 3, "storePendingNClients: returning " << npend
);
758 /* return true if the request should be aborted */
760 CheckQuickAbortIsReasonable(StoreEntry
* entry
)
763 debugs(90, 3, "entry=" << *entry
);
765 if (storePendingNClients(entry
) > 0) {
766 debugs(90, 3, "quick-abort? NO storePendingNClients() > 0");
770 if (!shutting_down
&& Store::Root().transientReaders(*entry
)) {
771 debugs(90, 3, "quick-abort? NO still have one or more transient readers");
775 if (entry
->store_status
!= STORE_PENDING
) {
776 debugs(90, 3, "quick-abort? NO store_status != STORE_PENDING");
780 if (EBIT_TEST(entry
->flags
, ENTRY_SPECIAL
)) {
781 debugs(90, 3, "quick-abort? NO ENTRY_SPECIAL");
785 MemObject
* const mem
= entry
->mem_obj
;
787 debugs(90, 3, "mem=" << mem
);
789 if (mem
->request
&& !mem
->request
->flags
.cachable
) {
790 debugs(90, 3, "quick-abort? YES !mem->request->flags.cachable");
794 if (EBIT_TEST(entry
->flags
, KEY_PRIVATE
)) {
795 debugs(90, 3, "quick-abort? YES KEY_PRIVATE");
799 int64_t expectlen
= entry
->getReply()->content_length
+ entry
->getReply()->hdr_sz
;
802 /* expectlen is < 0 if *no* information about the object has been received */
803 debugs(90, 3, "quick-abort? YES no object data received yet");
807 int64_t curlen
= mem
->endOffset();
809 if (Config
.quickAbort
.min
< 0) {
810 debugs(90, 3, "quick-abort? NO disabled");
814 if (mem
->request
&& mem
->request
->range
&& mem
->request
->getRangeOffsetLimit() < 0) {
815 /* Don't abort if the admin has configured range_ofset -1 to download fully for caching. */
816 debugs(90, 3, "quick-abort? NO admin configured range replies to full-download");
820 if (curlen
> expectlen
) {
821 debugs(90, 3, "quick-abort? YES bad content length (" << curlen
<< " of " << expectlen
<< " bytes received)");
825 if ((expectlen
- curlen
) < (Config
.quickAbort
.min
<< 10)) {
826 debugs(90, 3, "quick-abort? NO only a little more object left to receive");
830 if ((expectlen
- curlen
) > (Config
.quickAbort
.max
<< 10)) {
831 debugs(90, 3, "quick-abort? YES too much left to go");
835 if (expectlen
< 100) {
836 debugs(90, 3, "quick-abort? NO avoid FPE");
840 if ((curlen
/ (expectlen
/ 100)) > (Config
.quickAbort
.pct
)) {
841 debugs(90, 3, "quick-abort? NO past point of no return");
845 debugs(90, 3, "quick-abort? YES default");
850 store_client::dumpStats(MemBuf
* output
, int clientNumber
) const
852 if (_callback
.pending())
855 output
->appendf("\tClient #%d, %p\n", clientNumber
, _callback
.callback_data
);
856 output
->appendf("\t\tcopy_offset: %" PRId64
"\n", copyInto
.offset
);
857 output
->appendf("\t\tcopy_size: %" PRIuSIZE
"\n", copyInto
.length
);
858 output
->append("\t\tflags:", 8);
860 if (flags
.disk_io_pending
)
861 output
->append(" disk_io_pending", 16);
863 if (flags
.store_copying
)
864 output
->append(" store_copying", 14);
866 if (flags
.copy_event_pending
)
867 output
->append(" copy_event_pending", 19);
869 output
->append("\n",1);
873 store_client::Callback::pending() const
875 return callback_handler
&& callback_data
;
878 store_client::Callback::Callback(STCB
*function
, void *data
) : callback_handler(function
), callback_data (data
) {}
882 store_client::setDelayId(DelayId delay_id
)