+++ /dev/null
-/*
- * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
- *
- * Squid software is distributed under GPLv2+ license and includes
- * contributions from numerous individuals and organizations.
- * Please see the COPYING and CONTRIBUTORS files for details.
- */
-
-/* DEBUG: section 05 Comm */
-
-#ifndef COMMREAD_H
-#define COMMREAD_H
-
-#include "base/CbDataList.h"
-#include "comm.h"
-#include "comm/forward.h"
-#include "CommCalls.h"
-
-class CommRead
-{
-
-public:
- CommRead();
- CommRead(const Comm::ConnectionPointer &c, char *buf, int len, AsyncCall::Pointer &callback);
- Comm::ConnectionPointer conn;
- char *buf;
- int len;
- AsyncCall::Pointer callback;
-};
-
-class DeferredRead
-{
-
-public:
- typedef void DeferrableRead(void *context, CommRead const &);
- DeferredRead ();
- DeferredRead (DeferrableRead *, void *, CommRead const &);
- void markCancelled();
- DeferrableRead *theReader;
- void *theContext;
- CommRead theRead;
- bool cancelled;
- AsyncCall::Pointer closer; ///< internal close handler used by Comm
-
-private:
-};
-
-class DeferredReadManager
-{
-
-public:
- ~DeferredReadManager();
- void delayRead(DeferredRead const &);
- void kickReads(int const count);
-
-private:
- static CLCB CloseHandler;
- static DeferredRead popHead(CbDataListContainer<DeferredRead> &deferredReads);
- void kickARead(DeferredRead const &);
- void flushReads();
- CbDataListContainer<DeferredRead> deferredReads;
-};
-
-#endif /* COMMREAD_H */
-
#if USE_DELAY_POOLS
#include "auth/UserRequest.h"
-#include "CommRead.h"
+#include "base/DelayedAsyncCalls.h"
#include "DelayIdComposite.h"
#include "DelayPools.h"
#include "ip/Address.h"
class CompositeSelectionDetails;
virtual DelayIdComposite::Pointer id(CompositeSelectionDetails &) = 0;
- void delayRead(DeferredRead const &);
+ void delayRead(const AsyncCallPointer &);
/// \ingroup DelayPoolsAPI
class CompositeSelectionDetails
protected:
void kickReads();
- DeferredReadManager deferredReads;
+ DelayedAsyncCalls deferredReads;
};
#endif /* USE_DELAY_POOLS */
*/
#if USE_DELAY_POOLS
#include "acl/FilledChecklist.h"
+#include "base/DelayedAsyncCalls.h"
#include "client_side_request.h"
-#include "CommRead.h"
#include "DelayId.h"
#include "DelayPool.h"
#include "DelayPools.h"
}
void
-DelayId::delayRead(DeferredRead const &aRead)
+DelayId::delayRead(const AsyncCall::Pointer &aRead)
{
assert (compositeId != NULL);
compositeId->delayRead(aRead);
#if USE_DELAY_POOLS
+#include "base/forward.h"
#include "DelayIdComposite.h"
class ClientHttpRequest;
int bytesWanted(int min, int max) const;
void bytesIn (int qty);
void setNoDelay(bool const);
- void delayRead(DeferredRead const &);
+ void delayRead(const AsyncCallPointer &);
private:
unsigned short pool_;
#define DELAYIDCOMPOSITE_H
#if USE_DELAY_POOLS
+#include "base/forward.h"
#include "base/RefCount.h"
#include "fatal.h"
-class DeferredRead;
-
class DelayIdComposite : public RefCountable
{
virtual int bytesWanted (int min, int max) const =0;
virtual void bytesIn(int qty) = 0;
/* only aggregate and vector need this today */
- virtual void delayRead(DeferredRead const &) {fatal("Not implemented");}
+ virtual void delayRead(const AsyncCallPointer &) { fatal("Not implemented"); }
};
#endif /* USE_DELAY_POOLS */
// TODO: create DelayIdComposite.cc
void
-CompositePoolNode::delayRead(DeferredRead const &aRead)
+CompositePoolNode::delayRead(const AsyncCall::Pointer &aRead)
{
- deferredReads.delayRead(aRead);
+ deferredReads.delay(aRead);
}
#include "comm.h"
void
CompositePoolNode::kickReads()
{
- deferredReads.kickReads(-1);
+ deferredReads.schedule();
}
#endif /* USE_DELAY_POOLS */
}
void
-DelayTagged::Id::delayRead(DeferredRead const &aRead)
+DelayTagged::Id::delayRead(const AsyncCall::Pointer &aRead)
{
theTagged->delayRead(aRead);
}
#if USE_DELAY_POOLS
#include "auth/Gadgets.h"
+#include "base/forward.h"
#include "CompositePoolNode.h"
#include "DelayBucket.h"
#include "DelayIdComposite.h"
~Id();
virtual int bytesWanted (int min, int max) const;
virtual void bytesIn(int qty);
- virtual void delayRead(DeferredRead const &);
+ virtual void delayRead(const AsyncCallPointer &);
private:
RefCount<DelayTagged> theTagged;
#include "squid.h"
#if USE_DELAY_POOLS
+#include "base/AsyncCall.h"
+#include "base/DelayedAsyncCalls.h"
#include "comm/Connection.h"
-#include "CommRead.h"
#include "DelayVector.h"
DelayVector::DelayVector()
}
void
-DelayVector::Id::delayRead(DeferredRead const &aRead)
+DelayVector::Id::delayRead(const AsyncCallPointer &aRead)
{
theVector->delayRead(aRead);
}
#if USE_DELAY_POOLS
+#include "base/forward.h"
#include "CompositePoolNode.h"
/// \ingroup DelayPoolsAPI
~Id();
virtual int bytesWanted (int min, int max) const;
virtual void bytesIn(int qty);
- virtual void delayRead(DeferredRead const &);
+ virtual void delayRead(const AsyncCallPointer &);
private:
RefCount<DelayVector> theVector;
/* DEBUG: section 79 Squid-side DISKD I/O functions. */
#include "squid.h"
+#include "comm.h"
#include "comm/Loops.h"
#include "ConfigOption.h"
#include "diomsg.h"
/* DEBUG: section 79 Disk IO Routines */
#include "squid.h"
+#include "comm.h"
#include "DiskIO/IORequestor.h"
#include "DiskIO/ReadRequest.h"
#include "DiskIO/WriteRequest.h"
CollapsedForwarding.cc \
CollapsedForwarding.h \
CollapsingHistory.h \
- CommRead.h \
CommandLine.cc \
CommandLine.h \
ConfigOption.cc \
EventLoop.cc \
tests/stub_debug.cc \
tests/stub_fatal.cc \
- tests/stub_libtime.cc
+ tests/stub_libtime.cc \
+ tests/stub_SBuf.cc
tests_testEventLoop_LDADD = \
base/libbase.la \
$(LIBCPPUNIT_LIBS) \
}
void
-MemObject::delayRead(DeferredRead const &aRead)
+MemObject::delayRead(const AsyncCall::Pointer &aRead)
{
#if USE_DELAY_POOLS
if (readAheadPolicyCanRead()) {
}
}
#endif
- deferredReads.delayRead(aRead);
+ deferredReads.delay(aRead);
}
void
MemObject::kickReads()
{
- deferredReads.kickReads(-1);
+ deferredReads.schedule();
}
#if USE_DELAY_POOLS
#ifndef SQUID_MEMOBJECT_H
#define SQUID_MEMOBJECT_H
-#include "CommRead.h"
+#include "base/DelayedAsyncCalls.h"
#include "dlink.h"
#include "http/RequestMethod.h"
#include "RemovalPolicy.h"
PeerSelector *ircb_data = nullptr;
/// used for notifying StoreEntry writers about 3rd-party initiated aborts
- AsyncCall::Pointer abortCallback;
+ AsyncCallPointer abortCallback;
RemovalPolicyNode repl;
int id = 0;
int64_t object_sz = -1;
SBuf vary_headers;
- void delayRead(DeferredRead const &);
+ void delayRead(const AsyncCallPointer &);
void kickReads();
private:
mutable String storeId_; ///< StoreId for our entry (usually request URI)
mutable String logUri_; ///< URI used for logging (usually request URI)
- DeferredReadManager deferredReads;
+ DelayedAsyncCalls deferredReads;
};
/** global current memory removal policy */
#ifndef SQUID_STORE_H
#define SQUID_STORE_H
+#include "base/DelayedAsyncCalls.h"
#include "base/Packable.h"
#include "base/Range.h"
#include "base/RefCount.h"
#include "comm/forward.h"
-#include "CommRead.h"
#include "hash.h"
#include "http/forward.h"
#include "http/RequestMethod.h"
{
public:
- static DeferredRead::DeferrableRead DeferReader;
bool checkDeferRead(int fd) const;
const char *getMD5Text() const;
void destroyMemObject();
int checkTooSmall();
- void delayAwareRead(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer callback);
-
void setNoDelay (bool const);
void lastModified(const time_t when) { lastModified_ = when; }
/// \returns entry's 'effective' modification time
const bool wantToPostpone = isRepeatable || canStartBypass || protectGroupBypass;
// Why > 2? HttpState does not use the last bytes in the buffer
- // because delayAwareRead() is arguably broken. See
+ // because Client::delayRead() is arguably broken. See
// HttpStateData::maybeReadVirginBody for more details.
if (wantToPostpone && bp.buf().spaceSize() > 2) {
// Postponing may increase memory footprint and slow the HTTP side
*/
class CallDialer;
-class AsyncCallQueue;
class AsyncCall: public RefCountable
{
public:
typedef RefCount <AsyncCall> Pointer;
- friend class AsyncCallQueue;
AsyncCall(int aDebugSection, int aDebugLevel, const char *aName);
virtual ~AsyncCall();
virtual void fire() = 0;
- AsyncCall::Pointer theNext; // used exclusively by AsyncCallQueue
+ AsyncCall::Pointer theNext; ///< for AsyncCallList and similar lists
private:
const char *isCanceled; // set to the cancellation reason by cancel()
--- /dev/null
+/*
+ * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/Assure.h"
+#include "base/AsyncCall.h"
+#include "base/AsyncCallList.h"
+
+void
+AsyncCallList::add(const AsyncCall::Pointer &call)
+{
+ Assure(call);
+ Assure(!call->Next());
+ if (tail) { // append to the existing list
+ Assure(head);
+ Assure(!tail->Next());
+ tail->setNext(call);
+ tail = call;
+ } else { // create a list from scratch
+ Assure(!head);
+ head = tail = call;
+ }
+ ++length;
+ Assure(length); // no overflows
+}
+
+AsyncCall::Pointer
+AsyncCallList::extract()
+{
+ if (!head)
+ return AsyncCallPointer();
+
+ Assure(tail);
+ Assure(length);
+ const auto call = head;
+ head = call->Next();
+ call->setNext(nullptr);
+ if (tail == call)
+ tail = nullptr;
+ --length;
+ return call;
+}
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_BASE_ASYNCCALLLIST_H
+#define SQUID_BASE_ASYNCCALLLIST_H
+
+#include "base/forward.h"
+#include "base/RefCount.h"
+
+/// An efficient (but intrusive) AsyncCall storage preserving FIFO order.
+/// A given AsyncCall object may reside in at most one such storage.
+class AsyncCallList
+{
+public:
+ AsyncCallList() = default;
+ // prohibit copying: no AsyncCall should be present in two lists
+ AsyncCallList(const AsyncCallList &) = delete;
+ AsyncCallList &operator=(const AsyncCallList &) = delete;
+
+ /// stores the given async call
+ void add(const AsyncCallPointer &);
+
+ /// removes the earliest add()-ed call that is still stored (if any)
+ /// \returns the removed call (or nil)
+ /// \retval nil means the list stored no calls at extract() time
+ AsyncCallPointer extract();
+
+ /// the number of currently stored calls
+ size_t size() const { return length; }
+
+private:
+ AsyncCallPointer head; ///< the earliest still-stored call (or nil)
+ AsyncCallPointer tail; ///< the latest still-stored call (or nil)
+ size_t length = 0; ///< \copydoc size()
+};
+
+#endif /* SQUID_BASE_ASYNCCALLLIST_H */
+
AsyncCallQueue *AsyncCallQueue::TheInstance = 0;
-AsyncCallQueue::AsyncCallQueue(): theHead(NULL), theTail(NULL)
-{
-}
-
-void AsyncCallQueue::schedule(AsyncCall::Pointer &call)
-{
- assert(call != NULL);
- assert(!call->theNext);
- if (theHead != NULL) { // append
- assert(!theTail->theNext);
- theTail->theNext = call;
- theTail = call;
- } else { // create queue from cratch
- theHead = theTail = call;
- }
-}
-
// Fire all scheduled calls; returns true if at least one call was fired.
// The calls may be added while the current call is in progress.
bool
AsyncCallQueue::fire()
{
- const bool made = theHead != NULL;
- while (theHead) {
- CodeContext::Reset(theHead->codeContext);
- fireNext();
+ const auto made = scheduled.size() > 0;
+ while (const auto call = scheduled.extract()) {
+ CodeContext::Reset(call->codeContext);
+ debugs(call->debugSection, call->debugLevel, "entering " << *call);
+ call->make();
+ debugs(call->debugSection, call->debugLevel, "leaving " << *call);
}
if (made)
CodeContext::Reset();
return made;
}
-void
-AsyncCallQueue::fireNext()
-{
- AsyncCall::Pointer call = theHead;
- theHead = call->theNext;
- call->theNext = NULL;
- if (theTail == call)
- theTail = NULL;
-
- debugs(call->debugSection, call->debugLevel, "entering " << *call);
- call->make();
- debugs(call->debugSection, call->debugLevel, "leaving " << *call);
-}
-
AsyncCallQueue &
AsyncCallQueue::Instance()
{
#ifndef SQUID_ASYNCCALLQUEUE_H
#define SQUID_ASYNCCALLQUEUE_H
-#include "base/AsyncCall.h"
-
-//class AsyncCall;
+#include "base/AsyncCallList.h"
+#include "base/forward.h"
// The queue of asynchronous calls. All calls are fired during a single main
// loop iteration until the queue is exhausted
static AsyncCallQueue &Instance();
// make this async call when we get a chance
- void schedule(AsyncCall::Pointer &call);
+ void schedule(const AsyncCallPointer &call) { scheduled.add(call); }
// fire all scheduled calls; returns true if at least one was fired
bool fire();
private:
- AsyncCallQueue();
-
- void fireNext();
+ AsyncCallQueue() = default;
- AsyncCall::Pointer theHead;
- AsyncCall::Pointer theTail;
+ AsyncCallList scheduled; ///< calls waiting to be fire()d, in FIFO order
static AsyncCallQueue *TheInstance;
};
--- /dev/null
+/*
+ * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/AsyncCall.h"
+#include "base/DelayedAsyncCalls.h"
+#include "debug/Stream.h"
+
+void
+DelayedAsyncCalls::delay(const AsyncCall::Pointer &call)
+{
+ debugs(5, 3, call << " after " << deferredReads.size());
+ deferredReads.add(call);
+}
+
+void
+DelayedAsyncCalls::schedule()
+{
+ while (auto call = deferredReads.extract())
+ ScheduleCallHere(call);
+}
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_BASE_DELAYEDASYNCCALLS_H
+#define SQUID_BASE_DELAYEDASYNCCALLS_H
+
+#include "base/AsyncCallList.h"
+
+/// a FIFO list of async calls, all to be scheduled in FIFO order (on demand via
+/// the schedule() method or automatically at object destruction time)
+class DelayedAsyncCalls
+{
+public:
+ ~DelayedAsyncCalls() { schedule(); }
+
+ /// stores the given call to schedule it at schedule() or destruction time
+ void delay(const AsyncCallPointer &);
+
+ /// schedules and forgets all async calls previously stored by delay()
+ void schedule();
+
+private:
+ /// delay()-ed calls waiting to be scheduled, in delay() call order
+ AsyncCallList deferredReads;
+};
+
+#endif /* SQUID_BASE_DELAYEDASYNCCALLS_H */
+
Assure.h \
AsyncCall.cc \
AsyncCall.h \
+ AsyncCallList.cc \
+ AsyncCallList.h \
AsyncCallQueue.cc \
AsyncCallQueue.h \
AsyncCbdataCalls.h \
ClpMap.h \
CodeContext.cc \
CodeContext.h \
+ DelayedAsyncCalls.cc \
+ DelayedAsyncCalls.h \
EnumIterator.h \
File.cc \
File.h \
#ifndef SQUID_SRC_BASE_FORWARD_H
#define SQUID_SRC_BASE_FORWARD_H
+class AsyncCall;
class AsyncCallQueue;
class AsyncJob;
class CallDialer;
class CodeContext;
+class DelayedAsyncCalls;
class ScopedId;
class BadOptionalAccess;
class Raw;
typedef CbcPointer<AsyncJob> AsyncJobPointer;
typedef RefCount<CodeContext> CodeContextPointer;
+using AsyncCallPointer = RefCount<AsyncCall>;
#endif /* SQUID_SRC_BASE_FORWARD_H */
Must(bodyBytesRead >= 0);
}
+void
+Client::delayRead()
+{
+ using DeferredReadDialer = NullaryMemFunT<Client>;
+ AsyncCall::Pointer call = asyncCall(11, 5, "Client::noteDelayAwareReadChance",
+ DeferredReadDialer(this, &Client::noteDelayAwareReadChance));
+ entry->mem().delayRead(call);
+}
+
void
Client::addVirginReplyBody(const char *data, ssize_t len)
{
/// whether we may receive more virgin response body bytes
virtual bool mayReadVirginReplyBody() const = 0;
+ /// Called when a previously delayed dataConnection() read may be possible.
+ /// \sa delayRead()
+ virtual void noteDelayAwareReadChance() = 0;
+
/// Entry-dependent callbacks use this check to quit if the entry went bad
bool abortOnBadEntry(const char *abortReason);
void adjustBodyBytesRead(const int64_t delta);
+ /// Defer reading until it is likely to become possible.
+ /// Eventually, noteDelayAwareReadChance() will be called.
+ void delayRead();
+
// These should be private
int64_t currentOffset = 0; /**< Our current offset in the StoreEntry */
MemBuf *responseBodyBuffer = nullptr; /**< Data temporarily buffered for ICAP */
#include "squid.h"
#include "acl/FilledChecklist.h"
+#include "base/AsyncJobCalls.h"
+#include "base/Range.h"
#include "client_side.h"
#include "clients/FtpClient.h"
#include "comm/ConnOpener.h"
return data.conn;
}
+void
+Ftp::Client::noteDelayAwareReadChance()
+{
+ data.read_pending = false;
+ maybeReadVirginBody();
+}
+
void
Ftp::Client::maybeReadVirginBody()
{
debugs(9,5,"queueing read on FD " << data.conn->fd);
- typedef CommCbMemFunT<Client, CommIoCbParams> Dialer;
- entry->delayAwareRead(data.conn, data.readBuf->space(), read_sz,
- JobCallback(9, 5, Dialer, this, Ftp::Client::dataRead));
+ const auto amountToRead = entry->bytesWanted(Range<size_t>(0, read_sz));
+
+ if (amountToRead <= 0) {
+ delayRead();
+ return;
+ }
+
+ using ReadDialer = CommCbMemFunT<Client, CommIoCbParams>;
+ AsyncCall::Pointer readCallback = JobCallback(9, 5, ReadDialer, this, Client::dataRead);
+ comm_read(data.conn, data.readBuf->space(), amountToRead, readCallback);
}
void
virtual bool doneWithServer() const;
virtual const Comm::ConnectionPointer & dataConnection() const;
virtual void abortAll(const char *reason);
+ virtual void noteDelayAwareReadChance();
virtual Http::StatusCode failedHttpStatus(err_type &error);
void ctrlClosed(const CommCloseCbParams &io);
#include "comm/Read.h"
#include "comm/TcpAcceptor.h"
#include "comm/Write.h"
-#include "CommRead.h"
#include "compat/cmsg.h"
#include "DescriptorSet.h"
#include "event.h"
* + call read handlers with ERR_CLOSING
* + call closing handlers
*
- * NOTE: Comm::ERR_CLOSING will NOT be called for CommReads' sitting in a
- * DeferredReadManager.
+ * A deferred reader has no Comm read handler mentioned above. To stay in sync,
+ * such a reader must register a Comm closing handler.
*/
void
_comm_close(int fd, char const *file, int line)
// for (c = fd_table[fd].closeHandler; c; c = c->next)
// assert(c->handler != handler || c->data != data);
+ // TODO: Consider enhancing AsyncCallList to support random-access close
+ // handlers, perhaps after upgrading the remaining legacy CLCB handlers.
call->setNext(fd_table[fd].closeHandler);
fd_table[fd].closeHandler = call;
commPlanHalfClosedCheck(); // make sure this fd will be checked again
}
-CommRead::CommRead() : conn(NULL), buf(NULL), len(0), callback(NULL) {}
-
-CommRead::CommRead(const Comm::ConnectionPointer &c, char *buf_, int len_, AsyncCall::Pointer &callback_)
- : conn(c), buf(buf_), len(len_), callback(callback_) {}
-
-DeferredRead::DeferredRead () : theReader(NULL), theContext(NULL), theRead(), cancelled(false) {}
-
-DeferredRead::DeferredRead (DeferrableRead *aReader, void *data, CommRead const &aRead) : theReader(aReader), theContext (data), theRead(aRead), cancelled(false) {}
-
-DeferredReadManager::~DeferredReadManager()
-{
- flushReads();
- assert (deferredReads.empty());
-}
-
-/* explicit instantiation required for some systems */
-
-/// \cond AUTODOCS_IGNORE
-template cbdata_type CbDataList<DeferredRead>::CBDATA_CbDataList;
-/// \endcond
-
-void
-DeferredReadManager::delayRead(DeferredRead const &aRead)
-{
- debugs(5, 3, "Adding deferred read on " << aRead.theRead.conn);
- CbDataList<DeferredRead> *temp = deferredReads.push_back(aRead);
-
- // We have to use a global function as a closer and point to temp
- // instead of "this" because DeferredReadManager is not a job and
- // is not even cbdata protected
- // XXX: and yet we use cbdata protection functions on it??
- AsyncCall::Pointer closer = commCbCall(5,4,
- "DeferredReadManager::CloseHandler",
- CommCloseCbPtrFun(&CloseHandler, temp));
- comm_add_close_handler(aRead.theRead.conn->fd, closer);
- temp->element.closer = closer; // remember so that we can cancel
-}
-
-void
-DeferredReadManager::CloseHandler(const CommCloseCbParams ¶ms)
-{
- if (!cbdataReferenceValid(params.data))
- return;
-
- CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data;
-
- temp->element.closer = NULL;
- if (temp->element.theRead.conn) {
- temp->element.theRead.conn->noteClosure();
- temp->element.theRead.conn = nullptr;
- }
- temp->element.markCancelled();
-}
-
-DeferredRead
-DeferredReadManager::popHead(CbDataListContainer<DeferredRead> &deferredReads)
-{
- assert (!deferredReads.empty());
-
- DeferredRead &read = deferredReads.head->element;
-
- // NOTE: at this point the connection has been paused/stalled for an unknown
- // amount of time. We must re-validate that it is active and usable.
-
- // If the connection has been closed already. Cancel this read.
- if (!fd_table || !Comm::IsConnOpen(read.theRead.conn)) {
- if (read.closer != NULL) {
- read.closer->cancel("Connection closed before.");
- read.closer = NULL;
- }
- read.markCancelled();
- }
-
- if (!read.cancelled) {
- comm_remove_close_handler(read.theRead.conn->fd, read.closer);
- read.closer = NULL;
- }
-
- DeferredRead result = deferredReads.pop_front();
-
- return result;
-}
-
-void
-DeferredReadManager::kickReads(int const count)
-{
- /* if we had CbDataList::size() we could consolidate this and flushReads */
-
- if (count < 1) {
- flushReads();
- return;
- }
-
- size_t remaining = count;
-
- while (!deferredReads.empty() && remaining) {
- DeferredRead aRead = popHead(deferredReads);
- kickARead(aRead);
-
- if (!aRead.cancelled)
- --remaining;
- }
-}
-
-void
-DeferredReadManager::flushReads()
-{
- CbDataListContainer<DeferredRead> reads;
- reads = deferredReads;
- deferredReads = CbDataListContainer<DeferredRead>();
-
- // XXX: For fairness this SHOULD randomize the order
- while (!reads.empty()) {
- DeferredRead aRead = popHead(reads);
- kickARead(aRead);
- }
-}
-
-void
-DeferredReadManager::kickARead(DeferredRead const &aRead)
-{
- if (aRead.cancelled)
- return;
-
- // TODO: This check still allows theReader call with a closed theRead.conn.
- // If a delayRead() caller has a close connection handler, then such a call
- // would be useless and dangerous. If a delayRead() caller does not have it,
- // then the caller will get stuck when an external connection closure makes
- // aRead.cancelled (checked above) true.
- if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing())
- return;
-
- debugs(5, 3, "Kicking deferred read on " << aRead.theRead.conn);
-
- aRead.theReader(aRead.theContext, aRead.theRead);
-}
-
-void
-DeferredRead::markCancelled()
-{
- cancelled = true;
-}
-
int
CommSelectEngine::checkEvents(int timeout)
{
AggregateId (RefCount<Aggregate>);
virtual int bytesWanted (int min, int max) const;
virtual void bytesIn(int qty);
- virtual void delayRead(DeferredRead const &);
+ virtual void delayRead(const AsyncCallPointer &);
private:
RefCount<Aggregate> theAggregate;
};
void
-Aggregate::AggregateId::delayRead(DeferredRead const &aRead)
+Aggregate::AggregateId::delayRead(const AsyncCall::Pointer &aRead)
{
theAggregate->delayRead(aRead);
}
void
DelayPools::FreeDelayData()
{
- eventDelete(DelayPools::Update, NULL);
delete[] DelayPools::delay_data;
pools_ = 0;
}
void
DelayPools::Update(void *)
{
- if (!pools())
+ // To prevent stuck transactions, stop updates only after no new transactions can
+ // register (because the pools were disabled) and the last registered transaction is gone.
+ if (!pools() && toUpdate.empty())
return;
eventAdd("DelayPools::Update", Update, NULL, 1.0, 1);
/* DEBUG: section 10 Gopher */
#include "squid.h"
+#include "base/AsyncCbdataCalls.h"
#include "comm.h"
#include "comm/Read.h"
#include "comm/Write.h"
~GopherStateData();
+ /// queues or defers a read call
+ static void DelayAwareRead(GopherStateData *);
+
/// URL for icon to display (or nil), given the Gopher item-type code.
/// The returned c-string is invalidated by the next call to this function.
const char *iconUrl(char);
} else {
entry->append(buf, len);
}
- AsyncCall::Pointer call = commCbCall(5,4, "gopherReadReply",
- CommIoCbPtrFun(gopherReadReply, gopherState));
- comm_read(conn, buf, read_sz, call);
+ GopherStateData::DelayAwareRead(gopherState);
}
}
+void
+GopherStateData::DelayAwareRead(GopherStateData *gopherState)
+{
+ const auto &conn = gopherState->serverConn;
+
+ if (!Comm::IsConnOpen(conn) || fd_table[conn->fd].closing()) {
+ debugs(10, 3, "will not read from " << conn);
+ return;
+ }
+
+ const auto amountToRead = gopherState->entry->bytesWanted(Range<size_t>(0, BUFSIZ));
+
+ if (amountToRead <= 0) {
+ AsyncCall::Pointer delayCall = asyncCall(10, 3, "GopherStateData::DelayAwareRead",
+ cbdataDialer(&GopherStateData::DelayAwareRead, gopherState));
+ gopherState->entry->mem().delayRead(delayCall);
+ return;
+ }
+
+ AsyncCall::Pointer readCall = commCbCall(5, 5, "gopherReadReply", CommIoCbPtrFun(gopherReadReply, gopherState));
+ comm_read(conn, gopherState->replybuf, amountToRead, readCall);
+}
+
/**
* This will be called when request write is complete. Schedule read of reply.
*/
entry->flush();
}
- /* Schedule read reply. */
- AsyncCall::Pointer call = commCbCall(5,5, "gopherReadReply",
- CommIoCbPtrFun(gopherReadReply, gopherState));
- entry->delayAwareRead(conn, gopherState->replybuf, BUFSIZ, call);
+ GopherStateData::DelayAwareRead(gopherState);
}
/**
#include "squid.h"
#include "acl/FilledChecklist.h"
#include "base/AsyncJobCalls.h"
+#include "base/DelayedAsyncCalls.h"
#include "base/Raw.h"
#include "base/TextException.h"
#include "base64.h"
#include "comm/Connection.h"
#include "comm/Read.h"
#include "comm/Write.h"
-#include "CommRead.h"
#include "error/Detail.h"
#include "errorpage.h"
#include "fd.h"
return statusIfComplete();
}
-static void
-readDelayed(void *context, CommRead const &)
+void
+HttpStateData::noteDelayAwareReadChance()
{
- HttpStateData *state = static_cast<HttpStateData*>(context);
- state->flags.do_next_read = true;
- state->maybeReadVirginBody();
+ flags.do_next_read = true;
+ maybeReadVirginBody();
}
void
rd.size = entry->bytesWanted(Range<size_t>(0, inBuf.spaceSize()));
if (rd.size <= 0) {
- assert(entry->mem_obj);
- AsyncCall::Pointer nilCall;
- entry->mem_obj->delayRead(DeferredRead(readDelayed, this, CommRead(io.conn, NULL, 0, nilCall)));
+ delayRead();
return;
}
void processSurrogateControl(HttpReply *);
protected:
+ /* Client API */
+ virtual void noteDelayAwareReadChance();
+
void processReply();
void proceedAfter1xx();
void handle1xx(HttpReply *msg);
#include "AccessLogEntry.h"
#include "base/AsyncJobCalls.h"
#include "base/TextException.h"
+#include "comm.h"
#include "comm/Connection.h"
#include "CommCalls.h"
#include "errorpage.h"
#include "squid.h"
#include "base/AsyncCbdataCalls.h"
#include "base/TextException.h"
+#include "comm.h"
#include "comm/Connection.h"
#include "comm/Write.h"
#include "CommCalls.h"
return storeKeyText((const cache_key *)key);
}
-#include "comm.h"
-
-void
-StoreEntry::DeferReader(void *theContext, CommRead const &aRead)
-{
- StoreEntry *anEntry = (StoreEntry *)theContext;
- anEntry->delayAwareRead(aRead.conn,
- aRead.buf,
- aRead.len,
- aRead.callback);
-}
-
-void
-StoreEntry::delayAwareRead(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer callback)
-{
- size_t amountToRead = bytesWanted(Range<size_t>(0, len));
- /* sketch: readdeferer* = getdeferer.
- * ->deferRead (fd, buf, len, callback, DelayAwareRead, this)
- */
-
- if (amountToRead <= 0) {
- assert (mem_obj);
- mem_obj->delayRead(DeferredRead(DeferReader, this, CommRead(conn, buf, len, callback)));
- return;
- }
-
- if (fd_table[conn->fd].closing()) {
- // Readers must have closing callbacks if they want to be notified. No
- // readers appeared to care around 2009/12/14 as they skipped reading
- // for other reasons. Closing may already be true at the delyaAwareRead
- // call time or may happen while we wait after delayRead() above.
- debugs(20, 3, "will not read from closing " << conn << " for " << callback);
- return; // the read callback will never be called
- }
-
- comm_read(conn, buf, amountToRead, callback);
-}
-
size_t
StoreEntry::bytesWanted (Range<size_t> const aRange, bool ignoreDelayPools) const
{
DelayId::DelayId(): pool_(0), compositeId(NULL), markedAsNoDelay(false) {}
DelayId::~DelayId() {}
-void DelayId::delayRead(DeferredRead const&) STUB_NOP
+void DelayId::delayRead(const AsyncCallPointer &) STUB_NOP
void BandwidthBucket::refillBucket() STUB
bool BandwidthBucket::applyQuota(int &, Comm::IoCallback *) STUB_RETVAL(false)
BandwidthBucket *BandwidthBucket::SelectBucket(fde *) STUB_RETVAL(nullptr)
const char *MemObject::logUri() const STUB_RETVAL(NULL)
void MemObject::setUris(char const *, char const *, const HttpRequestMethod &) STUB
void MemObject::reset() STUB
-void MemObject::delayRead(DeferredRead const &) STUB
+void MemObject::delayRead(const AsyncCallPointer &) STUB
bool MemObject::readAheadPolicyCanRead() const STUB_RETVAL(false)
void MemObject::setNoDelay(bool const) STUB
MemObject::~MemObject() STUB
// void comm_read(const Comm::ConnectionPointer &, char *, int, IOCB *, void *) STUB
// void comm_read(const Comm::ConnectionPointer &, char*, int, AsyncCall::Pointer &) STUB
-/* should be in stub_CommRead */
-#include "CommRead.h"
-CommRead::CommRead(const Comm::ConnectionPointer &, char *, int, AsyncCall::Pointer &) STUB
-CommRead::CommRead() STUB
-DeferredReadManager::~DeferredReadManager() STUB
-DeferredRead::DeferredRead(DeferrableRead *, void *, CommRead const &) STUB
-void DeferredReadManager::delayRead(DeferredRead const &) STUB
-void DeferredReadManager::kickReads(int const) STUB
+/* should be in stub_libbase */
+#include "base/DelayedAsyncCalls.h"
+void DelayedAsyncCalls::delay(const AsyncCall::Pointer &) STUB
+void DelayedAsyncCalls::schedule() STUB
#include "comm.h"
bool comm_iocallbackpending(void) STUB_RETVAL(false)
void StoreEntry::unregisterAbortCallback(const char *) STUB
void StoreEntry::destroyMemObject() STUB
int StoreEntry::checkTooSmall() STUB_RETVAL(0)
-void StoreEntry::delayAwareRead(const Comm::ConnectionPointer&, char *, int, AsyncCall::Pointer) STUB
void StoreEntry::setNoDelay (bool const) STUB
bool StoreEntry::modifiedSince(const time_t, const int) const STUB_RETVAL(false)
bool StoreEntry::hasIfMatchEtag(const HttpRequest &) const STUB_RETVAL(false)
int bw = from.bytesWanted(1, SQUID_TCP_SO_RCVBUF);
// XXX: Delay pools must not delay client-to-Squid traffic (i.e. when
// from.readPendingFunc is tunnelDelayedClientRead()).
- // XXX: Bug #4913: Use DeferredRead instead.
+ // XXX: Bug #4913: For delay pools, use delayRead() API instead.
if (bw == 1 && ++from.delayedLoops < 10) {
from.readPending = this;
eventAdd("tunnelDelayedServerRead", from.readPendingFunc, from.readPending, 0.3, true);