#include "squid.h"
+#include <libecap/common/area.h>
+#include <libecap/common/delay.h>
#include <libecap/adapter/xaction.h>
#include "TextException.h"
#include "assert.h"
const Adaptation::ServicePointer &aService):
AsyncJob("Ecap::XactionRep"),
Adaptation::Initiate("Ecap::XactionRep", anInitiator, aService),
- theVirgin(virginHeader), theCause(virginCause),
- theVirginRep(theVirgin, NULL), theCauseRep(NULL),
- theAnswerRep(NULL)
+ theVirginRep(virginHeader), theCauseRep(NULL)
{
if (virginCause)
- theCauseRep = new MessageRep(theCause, NULL);
+ theCauseRep = new MessageRep(virginCause);
}
Ecap::XactionRep::~XactionRep()
{
assert(!theMaster);
delete theCauseRep;
- delete theAnswerRep;
+ theAnswerRep.reset();
}
void
Ecap::XactionRep::start()
{
Must(theMaster);
+
+ // register as a consumer if there is a body
+ // we do not actually consume unless the adapter tells us to
+ BodyPipePointer &p = theVirginRep.raw().body_pipe;
+ Must(!p || p->setConsumerIfNotLate(this));
+
theMaster->start();
}
void
-Ecap::XactionRep::swangSong()
+Ecap::XactionRep::swanSong()
{
terminateMaster();
Adaptation::Initiate::swanSong();
return theVirginRep;
}
-const libecap::Message *
+const libecap::Message &
Ecap::XactionRep::cause()
{
- return theCauseRep;
+ Must(theCauseRep != NULL);
+ return *theCauseRep;
}
-void
-Ecap::XactionRep::useVirgin()
+libecap::Message &
+Ecap::XactionRep::adapted()
{
- theMaster.reset();
- Adaptation::Message::ShortCircuit(theVirgin, theAnswer);
- Must(!theVirgin.body_pipe == !theAnswer.body_pipe);
- sendAnswer(theAnswer.header);
+ Must(theAnswerRep != NULL);
+ return *theAnswerRep;
+}
+
+Adaptation::Message &
+Ecap::XactionRep::answer()
+{
+ MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
+ Must(rep);
+ return rep->raw();
+}
+
+bool
+Ecap::XactionRep::doneAll() const
+{
+ if (theMaster) {
+ if (!doneWithAdapted() || sendingVirgin())
+ return false;
+ }
+
+ return Adaptation::Initiate::doneAll();
+}
+
+// are we still sending virgin body to theMaster?
+bool
+Ecap::XactionRep::doneWithAdapted() const
+{
+ if (!theAnswerRep)
+ return false;
+
+ // we are not done if we are producing
+ MessageRep *answer = dynamic_cast<MessageRep*>(theAnswerRep.get());
+ Must(answer);
+ const BodyPipePointer &ap = answer->raw().body_pipe;
+ return !ap || !ap->stillProducing(this);
+}
+
+// are we still sending virgin body to theMaster?
+bool
+Ecap::XactionRep::sendingVirgin() const
+{
+ // we are sending if we are consuming
+ const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
+ return vp != NULL && vp->stillConsuming(this);
+}
+
+// stops sending virgin to theMaster and enables auto-consumption
+void
+Ecap::XactionRep::dropVirgin(const char *reason)
+{
+ debugs(93,4, HERE << "because " << reason);
+
+ BodyPipePointer &p = theVirginRep.raw().body_pipe;
+ Must(p != NULL);
+ Must(p->stillConsuming(this));
+ stopConsumingFrom(p);
+ p->enableAutoConsumption();
+ if (doneWithAdapted())
+ theMaster.reset();
}
void
-Ecap::XactionRep::adaptVirgin()
+Ecap::XactionRep::useVirgin()
{
+ debugs(93,3, HERE << status());
+
// XXX: check state everywhere
Must(!theAnswerRep);
- Must(!theAnswer.header);
- Must(!theAnswer.body_pipe);
- theAnswer.set(theVirgin.header->clone());
- theAnswerRep = new MessageRep(theAnswer, this);
- Must(!theAnswer.body_pipe);
+ theMaster.reset();
+
+ HttpMsg *answer = theVirginRep.raw().header->clone();
+ Must(!theVirginRep.raw().body_pipe == !answer->body_pipe); // check clone()
+
+ if (answer->body_pipe != NULL) {
+ // if libecap consumed, we cannot shortcircuit
+ Must(!answer->body_pipe->consumedSize());
+ Must(answer->body_pipe->stillConsuming(this));
+ stopConsumingFrom(answer->body_pipe);
+ }
+
+ sendAnswer(answer);
+ Must(done());
}
void
-Ecap::XactionRep::adaptNewRequest()
+Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
{
- theAnswer.set(new HttpRequest);
- theAnswerRep = new MessageRep(theAnswer, this);
+ debugs(93,3, HERE << status());
+ theAnswerRep = m;
+ MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
+ Must(rep);
+ HttpMsg *answer = rep->raw().header;
+ if (!theAnswerRep->body()) {
+ if (!sendingVirgin())
+ theMaster.reset();
+ sendAnswer(answer);
+ } else {
+ Must(!answer->body_pipe); // only host can set body pipes
+ rep->tieBody(this);
+ debugs(93,4, HERE << "adapter will produce body" << status());
+ // libecap will produce
+ sendAnswer(answer);
+ }
}
-void
-Ecap::XactionRep::adaptNewResponse()
+// if adapter does not want to consume, we should not either
+void
+Ecap::XactionRep::adapterWontConsume()
{
- theAnswer.set(new HttpReply);
- theAnswerRep = new MessageRep(theAnswer, this);
+ if (sendingVirgin())
+ dropVirgin("adapterWontConsume");
}
-libecap::Message *
-Ecap::XactionRep::adapted()
+void
+Ecap::XactionRep::adapterWillConsume()
{
- return theAnswerRep;
+ Must(sendingVirgin());
+ theMaster->noteVirginDataAvailable(); // XXX: async
}
-void
-Ecap::XactionRep::useAdapted()
+void
+Ecap::XactionRep::adapterDoneConsuming()
{
- theMaster.reset();
- sendAnswer(theAnswer.header);
+ if (sendingVirgin())
+ dropVirgin("adapterDoneConsuming");
+}
+
+void
+Ecap::XactionRep::consumeVirgin(size_type n)
+{
+ BodyPipePointer &p = theVirginRep.raw().body_pipe;
+ Must(p != NULL);
+ const size_t size = static_cast<size_t>(n); // XXX: check for overflow
+ const size_t sizeMax = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
+ p->consume(min(size, sizeMax));
+}
+
+void
+Ecap::XactionRep::pauseVirginProduction()
+{
+ // TODO: support production pauses
+}
+
+void
+Ecap::XactionRep::resumeVirginProduction()
+{
+ // TODO: support production pauses
+}
+
+void
+Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
+{
+ Must(answer().body_pipe != NULL);
+ if (size.known())
+ answer().body_pipe->setBodySize(size.value());
+ // else the piped body size is unknown by default
+}
+
+void
+Ecap::XactionRep::appendAdapted(const libecap::Area &area)
+{
+ BodyPipe *p = answer().body_pipe.getRaw();
+ Must(p);
+ Must(p->putMoreData(area.start, area.size) == area.size);
+}
+
+bool
+Ecap::XactionRep::callable() const
+{
+ return !done();
+}
+
+void
+Ecap::XactionRep::noteAdaptedBodyEnd()
+{
+ Must(answer().body_pipe != NULL);
+ answer().body_pipe->clearProducer(true);
+ if (!sendingVirgin())
+ theMaster.reset();
+}
+
+void
+Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
+{
+ debugs(93,3, HERE << "adapter needs time: " <<
+ d.state << '/' << d.progress);
+ // XXX: set timeout?
}
void
-Ecap::XactionRep::useNone()
+Ecap::XactionRep::adaptationAborted()
{
theMaster.reset();
tellQueryAborted(true); // should eCAP support retries?
void
Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
{
- Must(theMaster);
- theMaster->noteAdaptedAborted();
+ terminateMaster();
}
void
void
Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
{
- Must(theMaster);
- theMaster->noteVirginAborted();
+ terminateMaster();
}
void
const char *Ecap::XactionRep::status() const
{
- return Adaptation::Initiate::status();
+ static MemBuf buf;
+ buf.reset();
+
+ buf.append(" [", 2);
+
+ if (theAnswerRep != NULL) {
+ MessageRep *answer = dynamic_cast<MessageRep*>(theAnswerRep.get());
+ Must(answer);
+ const BodyPipePointer &ap = answer->raw().body_pipe;
+ if (ap != NULL && ap->stillProducing(this))
+ buf.append("Ab ", 3);
+ else
+ buf.append("A. ", 3);
+ }
+
+ const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
+ if (vp != NULL && vp->stillConsuming(this))
+ buf.append("Vb ", 3);
+ else
+ buf.append("V. ", 3);
+
+ buf.Printf(" ecapx%d]", id);
+
+ buf.terminate();
+
+ return buf.content();
}
xaction that Squid communicates with. One eCAP module may register many
eCAP xactions. */
class XactionRep : public Adaptation::Initiate, public libecap::host::Xaction,
- public BodyProducer, public BodyConsumer
+ public BodyConsumer, public BodyProducer
{
public:
XactionRep(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, const Adaptation::ServicePointer &service);
void master(const AdapterXaction &aMaster); // establish a link
// libecap::host::Xaction API
- virtual libecap::Message &virgin(); // request or response
- virtual const libecap::Message *cause(); // request for the above response
- virtual void useVirgin(); // final answer: no adaptation
- virtual void adaptVirgin(); // adapted message starts as virgin
- virtual void adaptNewRequest(); // make fresh adapted request
- virtual void adaptNewResponse(); // make fresh adapted response
- virtual libecap::Message *adapted(); // request or response
- virtual void useAdapted(); // final answer: adapted msg is ready
- virtual void useNone(); // final answer: no answer
+ virtual libecap::Message &virgin();
+ virtual const libecap::Message &cause();
+ virtual libecap::Message &adapted();
+ virtual void useVirgin();
+ virtual void useAdapted(const libecap::shared_ptr<libecap::Message> &msg);
+ virtual void adaptationDelayed(const libecap::Delay &);
+ virtual void adaptationAborted();
+ virtual void adapterWontConsume();
+ virtual void adapterWillConsume();
+ virtual void adapterDoneConsuming();
+ virtual void consumeVirgin(size_type n);
+ virtual void pauseVirginProduction();
+ virtual void resumeVirginProduction();
+ virtual void setAdaptedBodySize(const libecap::BodySize &size);
+ virtual void appendAdapted(const libecap::Area &area);
+ virtual void noteAdaptedBodyEnd();
+
+ // libecap::Callable API, via libecap::host::Xaction
+ virtual bool callable() const;
// BodyProducer API
virtual void noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp);
// AsyncJob API (via Initiate)
virtual void start();
- virtual void swangSong();
+ virtual bool doneAll() const;
+ virtual void swanSong();
virtual const char *status() const;
protected:
+ Adaptation::Message &answer();
+
+ bool sendingVirgin() const;
+ void dropVirgin(const char *reason);
+ bool doneWithAdapted() const;
+
void terminateMaster();
+ void scheduleStop(const char *reason);
private:
AdapterXaction theMaster; // the actual adaptation xaction we represent
- Adaptation::Message theVirgin;
- Adaptation::Message theCause;
- Adaptation::Message theAnswer;
MessageRep theVirginRep;
MessageRep *theCauseRep;
- MessageRep *theAnswerRep;
+
+ typedef libecap::shared_ptr<libecap::Message> MessagePtr;
+ MessagePtr theAnswerRep;
CBDATA_CLASS2(XactionRep);
};