+/*
+ * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
-#ifndef SQUID_BODY_PIPE_H
-#define SQUID_BODY_PIPE_H
+#ifndef SQUID_SRC_BODYPIPE_H
+#define SQUID_SRC_BODYPIPE_H
-#include "MemBuf.h"
-#include "AsyncCall.h"
#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
+#include "MemBuf.h"
class BodyPipe;
-// Interface for those who want to produce body content for others.
-// BodyProducer is expected to create the BodyPipe.
-// One pipe cannot have more than one producer.
+/** Interface for those who want to produce body content for others.
+ * BodyProducer is expected to create the BodyPipe.
+ * One pipe cannot have more than one producer.
+ */
class BodyProducer: virtual public AsyncJob
{
public:
+ typedef CbcPointer<BodyProducer> Pointer;
+
BodyProducer():AsyncJob("BodyProducer") {}
- virtual ~BodyProducer() {}
+ ~BodyProducer() override {}
virtual void noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp) = 0;
virtual void noteBodyConsumerAborted(RefCount<BodyPipe> bp) = 0;
protected:
- void stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof);
+ void stopProducingFor(RefCount<BodyPipe> &, bool atEof);
};
-// Interface for those who want to consume body content from others.
-// BodyConsumer is expected to register with an existing BodyPipe
-// by calling BodyPipe::setConsumer().
-// One pipe cannot have more than one consumer.
+/** Interface for those who want to consume body content from others.
+ * BodyConsumer is expected to register with an existing BodyPipe
+ * by calling BodyPipe::setConsumer().
+ * One pipe cannot have more than one consumer.
+ */
class BodyConsumer: virtual public AsyncJob
{
public:
+ typedef CbcPointer<BodyConsumer> Pointer;
+
BodyConsumer():AsyncJob("BodyConsumer") {}
- virtual ~BodyConsumer() {}
+ ~BodyConsumer() override {}
virtual void noteMoreBodyDataAvailable(RefCount<BodyPipe> bp) = 0;
virtual void noteBodyProductionEnded(RefCount<BodyPipe> bp) = 0;
virtual void noteBodyProducerAborted(RefCount<BodyPipe> bp) = 0;
protected:
- void stopConsumingFrom(RefCount<BodyPipe> &pipe);
+ void stopConsumingFrom(RefCount<BodyPipe> &);
};
-// Makes raw buffer checkin/checkout interface efficient and exception-safe.
-// Either append or consume operations can be performed on a checkedout buffer.
+/** Makes raw buffer checkin/checkout interface efficient and exception-safe.
+ * Either append or consume operations can be performed on a checkedout buffer.
+ */
class BodyPipeCheckout
{
public:
friend class BodyPipe;
public:
- BodyPipeCheckout(BodyPipe &pipe); // checks out
+ BodyPipeCheckout(BodyPipe &); // checks out
~BodyPipeCheckout(); // aborts checkout unless checkedIn
void checkIn();
public:
- BodyPipe &pipe;
+ BodyPipe &thePipe;
MemBuf &buf;
const uint64_t offset; // of current content, relative to the body start
BodyPipeCheckout &operator =(const BodyPipeCheckout &); // prevent assignment
};
-// Connects those who produces message body content with those who
-// consume it. For example, connects ConnStateData with FtpStateData OR
-// ICAPModXact with HttpStateData.
+/** Connects those who produces message body content with those who
+ * consume it. For example, connects ConnStateData with FtpStateData OR
+ * ICAPModXact with HttpStateData.
+ */
class BodyPipe: public RefCountable
{
+ MEMPROXY_CLASS(BodyPipe);
+
public:
typedef RefCount<BodyPipe> Pointer;
typedef BodyProducer Producer;
typedef BodyConsumer Consumer;
typedef BodyPipeCheckout Checkout;
- enum { MaxCapacity = SQUID_TCP_SO_RCVBUF };
+ static constexpr size_t MaxCapacity = 64*1024;
friend class BodyPipeCheckout;
public:
BodyPipe(Producer *aProducer);
- ~BodyPipe(); // asserts that producer and consumer are cleared
+ ~BodyPipe() override; // asserts that producer and consumer are cleared
void setBodySize(uint64_t aSize); // set body size
bool bodySizeKnown() const { return theBodySize >= 0; }
uint64_t bodySize() const;
uint64_t consumedSize() const { return theGetSize; }
+ uint64_t producedSize() const { return thePutSize; }
bool productionEnded() const { return !theProducer; }
// called by producers
bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); }
bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; }
uint64_t unproducedSize() const; // size of still unproduced data
- bool stillProducing(const Producer *producer) const { return theProducer == producer; }
+ bool stillProducing(const Producer::Pointer &producer) const { return theProducer == producer; }
+ void expectProductionEndAfter(uint64_t extraSize); ///< sets or checks body size
// called by consumers
- bool setConsumerIfNotLate(Consumer *aConsumer);
+ bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer);
void clearConsumer(); // aborts if still piping
+ void expectNoConsumption(); ///< there will be no more setConsumer() calls
size_t getMoreData(MemBuf &buf);
void consume(size_t size);
bool expectMoreAfter(uint64_t offset) const;
bool exhausted() const; // saw eof/abort and all data consumed
- bool stillConsuming(const Consumer *consumer) const { return theConsumer == consumer; }
+ bool stillConsuming(const Consumer::Pointer &consumer) const { return theConsumer == consumer; }
- // start or continue consuming when there is no consumer
+ /// start or continue consuming when producing without consumer
void enableAutoConsumption();
const MemBuf &buf() const { return theBuf; }
void postConsume(size_t size);
void postAppend(size_t size);
- void startAutoConsumption(); // delayed start of enabled consumption
+ void startAutoConsumptionIfNeeded();
private:
int64_t theBodySize; // expected total content length, if known
- Producer *theProducer; // content producer, if any
- Consumer *theConsumer; // content consumer, if any
+ Producer::Pointer theProducer; // content producer, if any
+ Consumer::Pointer theConsumer; // content consumer, if any
uint64_t thePutSize; // ever-increasing total
uint64_t theGetSize; // ever-increasing total
MemBuf theBuf; // produced but not yet consumed content, if any
- bool mustAutoConsume; // consume when there is no consumer
+ bool mustAutoConsume; ///< keep theBuf empty when producing without consumer
+ bool abortedConsumption; ///< called BodyProducer::noteBodyConsumerAborted
bool isCheckedOut; // to keep track of checkout violations
-
- CBDATA_CLASS2(BodyPipe);
};
-#endif /* SQUID_BODY_PIPE_H */
+#endif /* SQUID_SRC_BODYPIPE_H */
+