X-Git-Url: http://git.ipfire.org/?a=blobdiff_plain;f=src%2FBodyPipe.h;h=e608bd615e8298557f9508f6090d0130a3cdaf3a;hb=0c060c02e48a1fedfb5c163f8ec54b8aa7f68588;hp=c56a6cdb112bda0bebc8e76ad578b79d433728b5;hpb=ebfcda6a35ec42415b14a2b42f068a22a368fe4c;p=thirdparty%2Fsquid.git diff --git a/src/BodyPipe.h b/src/BodyPipe.h index c56a6cdb11..e608bd615e 100644 --- a/src/BodyPipe.h +++ b/src/BodyPipe.h @@ -1,148 +1,172 @@ - -#ifndef SQUID_BODY_PIPE_H -#define SQUID_BODY_PIPE_H - +/* + * Copyright (C) 1996-2023 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_SRC_BODYPIPE_H +#define SQUID_SRC_BODYPIPE_H + +#include "base/AsyncJob.h" +#include "base/CbcPointer.h" #include "MemBuf.h" -#include "AsyncCall.h" -#include "ICAP/AsyncJob.h" class BodyPipe; -// Interface for those who want to produce body content for others. -// BodyProducer is expected to create the BodyPipe. -// One pipe cannot have more than one producer. -class BodyProducer: virtual public AsyncJob { - public: - BodyProducer():AsyncJob("BodyProducer"){} - virtual ~BodyProducer() {} +/** Interface for those who want to produce body content for others. + * BodyProducer is expected to create the BodyPipe. + * One pipe cannot have more than one producer. + */ +class BodyProducer: virtual public AsyncJob +{ +public: + typedef CbcPointer Pointer; - virtual void noteMoreBodySpaceAvailable(RefCount bp) = 0; - virtual void noteBodyConsumerAborted(RefCount bp) = 0; + BodyProducer():AsyncJob("BodyProducer") {} + ~BodyProducer() override {} - protected: - void stopProducingFor(RefCount &pipe, bool atEof); -}; + virtual void noteMoreBodySpaceAvailable(RefCount bp) = 0; + virtual void noteBodyConsumerAborted(RefCount bp) = 0; -// Interface for those who want to consume body content from others. -// BodyConsumer is expected to register with an existing BodyPipe -// by calling BodyPipe::setConsumer(). -// One pipe cannot have more than one consumer. -class BodyConsumer: virtual public AsyncJob { - public: - BodyConsumer():AsyncJob("BodyConsumer"){} - virtual ~BodyConsumer() {} - - virtual void noteMoreBodyDataAvailable(RefCount bp) = 0; - virtual void noteBodyProductionEnded(RefCount bp) = 0; - virtual void noteBodyProducerAborted(RefCount bp) = 0; - - protected: - void stopConsumingFrom(RefCount &pipe); +protected: + void stopProducingFor(RefCount &, bool atEof); }; -// Makes raw buffer checkin/checkout interface efficient and exception-safe. -// Either append or consume operations can be performed on a checkedout buffer. -class BodyPipeCheckout { - public: - friend class BodyPipe; - - public: - BodyPipeCheckout(BodyPipe &pipe); // checks out - ~BodyPipeCheckout(); // aborts checkout unless checkedIn - - void checkIn(); - - public: - BodyPipe &pipe; - MemBuf &buf; - const uint64_t offset; // of current content, relative to the body start - - protected: - const size_t checkedOutSize; - bool checkedIn; - - private: - BodyPipeCheckout(const BodyPipeCheckout &); // prevent copying - BodyPipeCheckout &operator =(const BodyPipeCheckout &); // prevent assignment +/** Interface for those who want to consume body content from others. + * BodyConsumer is expected to register with an existing BodyPipe + * by calling BodyPipe::setConsumer(). + * One pipe cannot have more than one consumer. + */ +class BodyConsumer: virtual public AsyncJob +{ +public: + typedef CbcPointer Pointer; + + BodyConsumer():AsyncJob("BodyConsumer") {} + ~BodyConsumer() override {} + + virtual void noteMoreBodyDataAvailable(RefCount bp) = 0; + virtual void noteBodyProductionEnded(RefCount bp) = 0; + virtual void noteBodyProducerAborted(RefCount bp) = 0; + +protected: + void stopConsumingFrom(RefCount &); }; -// Connects those who produces message body content with those who -// consume it. For example, connects ConnStateData with FtpStateData OR -// ICAPModXact with HttpStateData. -class BodyPipe: public RefCountable { - public: - typedef RefCount Pointer; - typedef BodyProducer Producer; - typedef BodyConsumer Consumer; - typedef BodyPipeCheckout Checkout; - - enum { MaxCapacity = SQUID_TCP_SO_RCVBUF }; - - friend class BodyPipeCheckout; - - public: - BodyPipe(Producer *aProducer); - ~BodyPipe(); // asserts that producer and consumer are cleared +/** Makes raw buffer checkin/checkout interface efficient and exception-safe. + * Either append or consume operations can be performed on a checkedout buffer. + */ +class BodyPipeCheckout +{ +public: + friend class BodyPipe; - void setBodySize(uint64_t aSize); // set body size - bool bodySizeKnown() const { return theBodySize >= 0; } - uint64_t bodySize() const; - uint64_t consumedSize() const { return theGetSize; } - bool productionEnded() const { return !theProducer; } +public: + BodyPipeCheckout(BodyPipe &); // checks out + ~BodyPipeCheckout(); // aborts checkout unless checkedIn - // called by producers - void clearProducer(bool atEof); // aborts or sends eof - size_t putMoreData(const char *buf, size_t size); - bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); } - bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; } - uint64_t unproducedSize() const; // size of still unproduced data - bool stillProducing(Producer *producer) const { return theProducer == producer; } + void checkIn(); - // called by consumers - bool setConsumerIfNotLate(Consumer *aConsumer); - void clearConsumer(); // aborts if still piping - size_t getMoreData(MemBuf &buf); - void consume(size_t size); - bool expectMoreAfter(uint64_t offset) const; - bool exhausted() const; // saw eof/abort and all data consumed - bool stillConsuming(Consumer *consumer) const { return theConsumer == consumer; } +public: + BodyPipe &thePipe; + MemBuf &buf; + const uint64_t offset; // of current content, relative to the body start - // start or continue consuming when there is no consumer - void enableAutoConsumption(); +protected: + const size_t checkedOutSize; + bool checkedIn; - const MemBuf &buf() const { return theBuf; } - - const char *status() const; // for debugging only - - protected: - // lower-level interface used by Checkout - MemBuf &checkOut(); // obtain raw buffer - void checkIn(Checkout &checkout); // return updated raw buffer - void undoCheckOut(Checkout &checkout); // undo checkout efffect - - void scheduleBodyDataNotification(); - void scheduleBodyEndNotification(); - - // keep counters in sync and notify producer or consumer - void postConsume(size_t size); - void postAppend(size_t size); - - void startAutoConsumption(); // delayed start of enabled consumption - - private: - int64_t theBodySize; // expected total content length, if known - Producer *theProducer; // content producer, if any - Consumer *theConsumer; // content consumer, if any - - uint64_t thePutSize; // ever-increasing total - uint64_t theGetSize; // ever-increasing total - - MemBuf theBuf; // produced but not yet consumed content, if any - - bool mustAutoConsume; // consume when there is no consumer - bool isCheckedOut; // to keep track of checkout violations +private: + BodyPipeCheckout(const BodyPipeCheckout &); // prevent copying + BodyPipeCheckout &operator =(const BodyPipeCheckout &); // prevent assignment +}; - CBDATA_CLASS2(BodyPipe); +/** Connects those who produces message body content with those who + * consume it. For example, connects ConnStateData with FtpStateData OR + * ICAPModXact with HttpStateData. + */ +class BodyPipe: public RefCountable +{ + MEMPROXY_CLASS(BodyPipe); + +public: + typedef RefCount Pointer; + typedef BodyProducer Producer; + typedef BodyConsumer Consumer; + typedef BodyPipeCheckout Checkout; + + static constexpr size_t MaxCapacity = 64*1024; + + friend class BodyPipeCheckout; + +public: + BodyPipe(Producer *aProducer); + ~BodyPipe() override; // asserts that producer and consumer are cleared + + void setBodySize(uint64_t aSize); // set body size + bool bodySizeKnown() const { return theBodySize >= 0; } + uint64_t bodySize() const; + uint64_t consumedSize() const { return theGetSize; } + uint64_t producedSize() const { return thePutSize; } + bool productionEnded() const { return !theProducer; } + + // called by producers + void clearProducer(bool atEof); // aborts or sends eof + size_t putMoreData(const char *buf, size_t size); + bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); } + bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; } + uint64_t unproducedSize() const; // size of still unproduced data + bool stillProducing(const Producer::Pointer &producer) const { return theProducer == producer; } + void expectProductionEndAfter(uint64_t extraSize); ///< sets or checks body size + + // called by consumers + bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer); + void clearConsumer(); // aborts if still piping + void expectNoConsumption(); ///< there will be no more setConsumer() calls + size_t getMoreData(MemBuf &buf); + void consume(size_t size); + bool expectMoreAfter(uint64_t offset) const; + bool exhausted() const; // saw eof/abort and all data consumed + bool stillConsuming(const Consumer::Pointer &consumer) const { return theConsumer == consumer; } + + /// start or continue consuming when producing without consumer + void enableAutoConsumption(); + + const MemBuf &buf() const { return theBuf; } + + const char *status() const; // for debugging only + +protected: + // lower-level interface used by Checkout + MemBuf &checkOut(); // obtain raw buffer + void checkIn(Checkout &checkout); // return updated raw buffer + void undoCheckOut(Checkout &checkout); // undo checkout efffect + + void scheduleBodyDataNotification(); + void scheduleBodyEndNotification(); + + // keep counters in sync and notify producer or consumer + void postConsume(size_t size); + void postAppend(size_t size); + + void startAutoConsumptionIfNeeded(); + +private: + int64_t theBodySize; // expected total content length, if known + Producer::Pointer theProducer; // content producer, if any + Consumer::Pointer theConsumer; // content consumer, if any + + uint64_t thePutSize; // ever-increasing total + uint64_t theGetSize; // ever-increasing total + + MemBuf theBuf; // produced but not yet consumed content, if any + + bool mustAutoConsume; ///< keep theBuf empty when producing without consumer + bool abortedConsumption; ///< called BodyProducer::noteBodyConsumerAborted + bool isCheckedOut; // to keep track of checkout violations }; -#endif /* SQUID_BODY_PIPE_H */ +#endif /* SQUID_SRC_BODYPIPE_H */ +