#include "squid.h"
+#include "base/AsyncJobCalls.h"
#include "BodyPipe.h"
#include "TextException.h"
public:
typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
- BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
- BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
+ BodyProducerDialer(const BodyProducer::Pointer &aProducer,
+ Parent::Method aHandler, BodyPipe::Pointer bp):
+ Parent(aProducer, aHandler, bp) {}
virtual bool canDial(AsyncCall &call);
};
public:
typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
- BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
- BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
+ BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
+ Parent::Method aHandler, BodyPipe::Pointer bp):
+ Parent(aConsumer, aHandler, bp) {}
virtual bool canDial(AsyncCall &call);
};
if (!Parent::canDial(call))
return false;
- BodyProducer *producer = object;
+ const BodyProducer::Pointer &producer = job;
BodyPipe::Pointer pipe = arg1;
if (!pipe->stillProducing(producer)) {
debugs(call.debugSection, call.debugLevel, HERE << producer <<
if (!Parent::canDial(call))
return false;
- BodyConsumer *consumer = object;
+ const BodyConsumer::Pointer &consumer = job;
BodyPipe::Pointer pipe = arg1;
if (!pipe->stillConsuming(consumer)) {
debugs(call.debugSection, call.debugLevel, HERE << consumer <<
void
BodyPipe::clearProducer(bool atEof)
{
- if (theProducer) {
+ if (theProducer.set()) {
debugs(91,7, HERE << "clearing BodyPipe producer" << status());
- theProducer = NULL;
+ theProducer.clear();
if (atEof) {
if (!bodySizeKnown())
theBodySize = thePutSize;
}
bool
-BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
+BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
{
assert(!theConsumer);
- assert(aConsumer);
+ assert(aConsumer.set()); // but might be invalid
// TODO: convert this into an exception and remove IfNotLate suffix
// If there is something consumed already, we are in an auto-consuming mode
void
BodyPipe::clearConsumer()
{
- if (theConsumer) {
+ if (theConsumer.set()) {
debugs(91,7, HERE << "clearing consumer" << status());
- theConsumer = NULL;
+ theConsumer.clear();
if (consumedSize() && !exhausted()) {
AsyncCall::Pointer call= asyncCall(91, 7,
"BodyProducer::noteBodyConsumerAborted",
void
BodyPipe::scheduleBodyDataNotification()
{
- if (theConsumer) {
+ if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
AsyncCall::Pointer call = asyncCall(91, 7,
"BodyConsumer::noteMoreBodyDataAvailable",
BodyConsumerDialer(theConsumer,
void
BodyPipe::scheduleBodyEndNotification()
{
- if (theConsumer) {
+ if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
if (bodySizeKnown() && bodySize() == thePutSize) {
AsyncCall::Pointer call = asyncCall(91, 7,
"BodyConsumer::noteBodyProductionEnded",
outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
outputBuffer.Printf(" pipe%p", this);
- if (theProducer)
- outputBuffer.Printf(" prod%p", theProducer);
- if (theConsumer)
- outputBuffer.Printf(" cons%p", theConsumer);
+ if (theProducer.set())
+ outputBuffer.Printf(" prod%p", theProducer.get());
+ if (theConsumer.set())
+ outputBuffer.Printf(" cons%p", theConsumer.get());
if (mustAutoConsume)
outputBuffer.append(" A", 2);
#define SQUID_BODY_PIPE_H
#include "MemBuf.h"
-#include "base/AsyncCall.h"
#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
class BodyPipe;
class BodyProducer: virtual public AsyncJob
{
public:
+ typedef CbcPointer<BodyProducer> Pointer;
+
BodyProducer():AsyncJob("BodyProducer") {}
virtual ~BodyProducer() {}
class BodyConsumer: virtual public AsyncJob
{
public:
+ typedef CbcPointer<BodyConsumer> Pointer;
+
BodyConsumer():AsyncJob("BodyConsumer") {}
virtual ~BodyConsumer() {}
bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); }
bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; }
uint64_t unproducedSize() const; // size of still unproduced data
- bool stillProducing(const Producer *producer) const { return theProducer == producer; }
+ bool stillProducing(const Producer::Pointer &producer) const { return theProducer == producer; }
// called by consumers
- bool setConsumerIfNotLate(Consumer *aConsumer);
+ bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer);
void clearConsumer(); // aborts if still piping
size_t getMoreData(MemBuf &buf);
void consume(size_t size);
bool expectMoreAfter(uint64_t offset) const;
bool exhausted() const; // saw eof/abort and all data consumed
- bool stillConsuming(const Consumer *consumer) const { return theConsumer == consumer; }
+ bool stillConsuming(const Consumer::Pointer &consumer) const { return theConsumer == consumer; }
// start or continue consuming when there is no consumer
void enableAutoConsumption();
private:
int64_t theBodySize; // expected total content length, if known
- Producer *theProducer; // content producer, if any
- Consumer *theConsumer; // content consumer, if any
+ Producer::Pointer theProducer; // content producer, if any
+ Consumer::Pointer theConsumer; // content consumer, if any
uint64_t thePutSize; // ever-increasing total
uint64_t theGetSize; // ever-increasing total
// All job dialers with comm parameters are merged into one since they
// all have exactly one callback argument and differ in Params type only
template <class C, class Params_>
-class CommCbMemFunT: public JobDialer, public CommDialerParamsT<Params_>
+class CommCbMemFunT: public JobDialer<C>, public CommDialerParamsT<Params_>
{
public:
typedef Params_ Params;
typedef void (C::*Method)(const Params &io);
- CommCbMemFunT(C *obj, Method meth): JobDialer(obj),
- CommDialerParamsT<Params>(obj), object(obj), method(meth) {}
+ CommCbMemFunT(const CbcPointer<C> &job, Method meth): JobDialer<C>(job),
+ CommDialerParamsT<Params_>(job.get()),
+ method(meth) {}
virtual bool canDial(AsyncCall &c) {
- return JobDialer::canDial(c) &&
+ return JobDialer<C>::canDial(c) &&
this->params.syncWithComm();
}
}
public:
- C *object;
Method method;
protected:
- virtual void doDial() { (object->*method)(this->params); }
+ virtual void doDial() { ((&(*this->job))->*method)(this->params); }
};
if (requestBodySource->getMoreData(buf)) {
debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
- requestSender = asyncCall(93,3, "ServerStateData::sentRequestBody",
- Dialer(this, &ServerStateData::sentRequestBody));
+ requestSender = JobCallback(93,3,
+ Dialer, this, ServerStateData::sentRequestBody);
comm_write_mbuf(fd, &buf, requestSender);
} else {
debugs(9,3, HERE << "will wait for more request body bytes or eof");
}
adaptedHeadSource = initiateAdaptation(
- new Adaptation::Iterator(this, vrep, cause, group));
- startedAdaptation = adaptedHeadSource != NULL;
+ new Adaptation::Iterator(vrep, cause, group));
+ startedAdaptation = initiated(adaptedHeadSource);
Must(startedAdaptation);
}
#if USE_ADAPTATION
BodyPipe::Pointer virginBodyDestination; /**< to provide virgin response body */
- Adaptation::Initiate *adaptedHeadSource; /**< to get adapted response headers */
+ CbcPointer<Adaptation::Initiate> adaptedHeadSource; /**< to get adapted response headers */
BodyPipe::Pointer adaptedBodySource; /**< to consume adated response body */
bool adaptationAccessCheckPending;
if (Config::Enabled) {
// the new check will call the callback and delete self, eventually
- return AsyncStart(new AccessCheck(
- ServiceFilter(method, vp, req, rep), cb, cbdata));
+ AsyncJob::Start(new AccessCheck( // we do not store so not a CbcPointer
+ ServiceFilter(method, vp, req, rep), cb, cbdata));
+ return true;
}
debugs(83, 3, HERE << "adaptation off, skipping");
#include "HttpMsg.h"
#include "adaptation/Initiator.h"
#include "adaptation/Initiate.h"
+#include "base/AsyncJobCalls.h"
namespace Adaptation
{
public:
typedef UnaryMemFunT<Initiator, HttpMsg*> Parent;
- AnswerDialer(Initiator *obj, Parent::Method meth, HttpMsg *msg):
- Parent(obj, meth, msg) { HTTPMSGLOCK(arg1); }
- AnswerDialer(const AnswerDialer &d):
- Parent(d) { HTTPMSGLOCK(arg1); }
+ AnswerDialer(const Parent::JobPointer &job, Parent::Method meth,
+ HttpMsg *msg): Parent(job, meth, msg) { HTTPMSGLOCK(arg1); }
+ AnswerDialer(const AnswerDialer &d): Parent(d) { HTTPMSGLOCK(arg1); }
virtual ~AnswerDialer() { HTTPMSGUNLOCK(arg1); }
+
+private:
+ AnswerDialer &operator =(const AnswerDialer &); // not implemented
};
} // namespace Adaptation
/* Initiate */
-Adaptation::Initiate::Initiate(const char *aTypeName, Initiator *anInitiator):
- AsyncJob(aTypeName), theInitiator(anInitiator)
+Adaptation::Initiate::Initiate(const char *aTypeName): AsyncJob(aTypeName)
{
- assert(theInitiator);
}
Adaptation::Initiate::~Initiate()
// can assert(!(wasStarted && theInitiator)).
}
+void
+Adaptation::Initiate::initiator(const CbcPointer<Initiator> &i)
+{
+ Must(!theInitiator);
+ Must(i.valid());
+ theInitiator = i;
+}
+
+
// internal cleanup
void Adaptation::Initiate::swanSong()
{
debugs(93, 5, HERE << "swan sings" << status());
- if (theInitiator) {
+ if (theInitiator.set()) {
debugs(93, 3, HERE << "fatal failure; sending abort notification");
tellQueryAborted(true); // final by default
}
void Adaptation::Initiate::clearInitiator()
{
- if (theInitiator)
- theInitiator.clear();
+ theInitiator.clear();
}
void Adaptation::Initiate::sendAnswer(HttpMsg *msg)
{
assert(msg);
- if (theInitiator.isThere()) {
- CallJob(93, 5, __FILE__, __LINE__, "Initiator::noteAdaptAnswer",
- AnswerDialer(theInitiator.ptr(), &Initiator::noteAdaptationAnswer, msg));
- }
+ CallJob(93, 5, __FILE__, __LINE__, "Initiator::noteAdaptationAnswer",
+ AnswerDialer(theInitiator, &Initiator::noteAdaptationAnswer, msg));
clearInitiator();
}
void Adaptation::Initiate::tellQueryAborted(bool final)
{
- if (theInitiator.isThere()) {
- CallJobHere1(93, 5, theInitiator.ptr(),
- Initiator::noteAdaptationQueryAbort, final);
- }
+ CallJobHere1(93, 5, theInitiator,
+ Initiator, noteAdaptationQueryAbort, final);
clearInitiator();
}
{
return AsyncJob::status(); // for now
}
-
-
-/* InitiatorHolder */
-
-Adaptation::InitiatorHolder::InitiatorHolder(Initiator *anInitiator):
- prime(0), cbdata(0)
-{
- if (anInitiator) {
- cbdata = cbdataReference(anInitiator->toCbdata());
- prime = anInitiator;
- }
-}
-
-Adaptation::InitiatorHolder::InitiatorHolder(const InitiatorHolder &anInitiator):
- prime(0), cbdata(0)
-{
- if (anInitiator != NULL && cbdataReferenceValid(anInitiator.cbdata)) {
- cbdata = cbdataReference(anInitiator.cbdata);
- prime = anInitiator.prime;
- }
-}
-
-Adaptation::InitiatorHolder::~InitiatorHolder()
-{
- clear();
-}
-
-void Adaptation::InitiatorHolder::clear()
-{
- if (prime) {
- prime = NULL;
- cbdataReferenceDone(cbdata);
- }
-}
-
-Adaptation::Initiator *Adaptation::InitiatorHolder::ptr()
-{
- assert(isThere());
- return prime;
-}
-
-bool
-Adaptation::InitiatorHolder::isThere()
-{
- return prime && cbdataReferenceValid(cbdata);
-}
-
-// should not be used
-Adaptation::InitiatorHolder &
-Adaptation::InitiatorHolder::operator =(const InitiatorHolder &anInitiator)
-{
- assert(false);
- return *this;
-}
#ifndef SQUID_ADAPTATION__INITIATE_H
#define SQUID_ADAPTATION__INITIATE_H
-#include "base/AsyncCall.h"
#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
#include "adaptation/forward.h"
class HttpMsg;
namespace Adaptation
{
-/* Initiator holder associtates an initiator with its cbdata. It is used as
- * a temporary hack to make cbdata work with multiple inheritance. We need
- * this hack because we cannot know whether the initiator pointer is still
- * valid without dereferencing it to call toCbdata()
- * TODO: JobDialer uses the same trick. Factor out or move this code. */
-class InitiatorHolder
-{
-public:
- InitiatorHolder(Initiator *anInitiator);
- InitiatorHolder(const InitiatorHolder &anInitiator);
- ~InitiatorHolder();
-
- void clear();
-
- // to make comparison with NULL possible
- operator void*() { return prime; }
- bool operator == (void *) const { return prime == NULL; }
- bool operator != (void *) const { return prime != NULL; }
- bool operator !() const { return !prime; }
-
- bool isThere(); // we have a valid initiator pointer
- Initiator *ptr(); // asserts isThere()
- void *theCbdata() { return cbdata;}
-
-private:
- InitiatorHolder &operator =(const InitiatorHolder &anInitiator);
-
- Initiator *prime;
- void *cbdata;
-};
-
/*
* The Initiate is a common base for queries or transactions
* initiated by an Initiator. This interface exists to allow an
{
public:
- Initiate(const char *aTypeName, Initiator *anInitiator);
+ Initiate(const char *aTypeName);
virtual ~Initiate();
+ void initiator(const CbcPointer<Initiator> &i); ///< sets initiator
+
// communication with the initiator
virtual void noteInitiatorAborted() = 0;
virtual const char *status() const; // for debugging
- InitiatorHolder theInitiator;
+ CbcPointer<Initiator> theInitiator;
private:
Initiate(const Initiate &); // no definition
#include "squid.h"
#include "adaptation/Initiate.h"
#include "adaptation/Initiator.h"
+#include "base/AsyncJobCalls.h"
-Adaptation::Initiate *
-Adaptation::Initiator::initiateAdaptation(Adaptation::Initiate *x)
+CbcPointer<Adaptation::Initiate>
+Adaptation::Initiator::initiateAdaptation(Initiate *x)
{
- if ((x = dynamic_cast<Initiate*>(Initiate::AsyncStart(x))))
- x = cbdataReference(x);
- return x;
+ CbcPointer<Initiate> i(x);
+ x->initiator(this);
+ Start(x);
+ return i;
}
void
-Adaptation::Initiator::clearAdaptation(Initiate *&x)
+Adaptation::Initiator::clearAdaptation(CbcPointer<Initiate> &x)
{
- assert(x);
- cbdataReferenceDone(x);
+ x.clear();
}
void
-Adaptation::Initiator::announceInitiatorAbort(Initiate *&x)
+Adaptation::Initiator::announceInitiatorAbort(CbcPointer<Initiate> &x)
{
- if (x) {
- CallJobHere(93, 5, x, Initiate::noteInitiatorAborted);
- clearAdaptation(x);
- }
+ CallJobHere(93, 5, x, Initiate, noteInitiatorAborted);
+ clearAdaptation(x);
}
#define SQUID_ADAPTATION__INITIATOR_H
#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
#include "adaptation/forward.h"
/*
virtual void noteAdaptationQueryAbort(bool final) = 0;
protected:
- Initiate *initiateAdaptation(Initiate *x); // locks and returns x
+ ///< starts freshly created initiate and returns a safe pointer to it
+ CbcPointer<Initiate> initiateAdaptation(Initiate *x);
- // done with x (and not calling announceInitiatorAbort)
- void clearAdaptation(Initiate *&x); // unlocks x
+ /// clears the pointer (does not call announceInitiatorAbort)
+ void clearAdaptation(CbcPointer<Initiate> &x);
- // inform the transaction about abnormal termination and clear it
- void announceInitiatorAbort(Initiate *&x); // unlocks x
+ /// inform the transaction about abnormal termination and clear the pointer
+ void announceInitiatorAbort(CbcPointer<Initiate> &x);
+
+ /// Must(initiated(initiate)) instead of Must(initiate.set()), for clarity
+ bool initiated(const CbcPointer<AsyncJob> &job) const { return job.set(); }
};
} // namespace Adaptation
#include "adaptation/ServiceGroups.h"
-Adaptation::Iterator::Iterator(Adaptation::Initiator *anInitiator,
- HttpMsg *aMsg, HttpRequest *aCause,
- const ServiceGroupPointer &aGroup):
+Adaptation::Iterator::Iterator(
+ HttpMsg *aMsg, HttpRequest *aCause,
+ const ServiceGroupPointer &aGroup):
AsyncJob("Iterator"),
- Adaptation::Initiate("Iterator", anInitiator),
+ Adaptation::Initiate("Iterator"),
theGroup(aGroup),
theMsg(HTTPMSGLOCK(aMsg)),
theCause(aCause ? HTTPMSGLOCK(aCause) : NULL),
debugs(93,5, HERE << "using adaptation service: " << service->cfg().key);
theLauncher = initiateAdaptation(
- service->makeXactLauncher(this, theMsg, theCause));
- Must(theLauncher);
+ service->makeXactLauncher(theMsg, theCause));
+ Must(initiated(theLauncher));
Must(!done());
}
void Adaptation::Iterator::swanSong()
{
- if (theInitiator)
+ if (theInitiator.set())
tellQueryAborted(true); // abnormal condition that should not happen
- if (theLauncher)
+ if (initiated(theLauncher))
clearAdaptation(theLauncher);
Adaptation::Initiate::swanSong();
class Iterator: public Initiate, public Initiator
{
public:
- Iterator(Adaptation::Initiator *anInitiator,
- HttpMsg *virginHeader, HttpRequest *virginCause,
+ Iterator(HttpMsg *virginHeader, HttpRequest *virginCause,
const Adaptation::ServiceGroupPointer &aGroup);
virtual ~Iterator();
ServicePlan thePlan; ///< which services to use and in what order
HttpMsg *theMsg; ///< the message being adapted (virgin for each step)
HttpRequest *theCause; ///< the cause of the original virgin message
- Adaptation::Initiate *theLauncher; ///< current transaction launcher
+ CbcPointer<Adaptation::Initiate> theLauncher; ///< current transaction launcher
int iterations; ///< number of steps initiated
bool adapted; ///< whether the virgin message has been replaced
virtual bool broken() const;
virtual bool up() const = 0; // see comments above
- virtual Initiate *makeXactLauncher(Initiator *, HttpMsg *virginHeader, HttpRequest *virginCause) = 0;
+ virtual Initiate *makeXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause) = 0;
typedef void Callback(void *data, Pointer &service);
void callWhenReady(Callback *cb, void *data);
}
Adaptation::Initiate *
-Adaptation::Ecap::ServiceRep::makeXactLauncher(Adaptation::Initiator *initiator,
- HttpMsg *virgin, HttpRequest *cause)
+Adaptation::Ecap::ServiceRep::makeXactLauncher(HttpMsg *virgin,
+ HttpRequest *cause)
{
Must(up());
- XactionRep *rep = new XactionRep(initiator, virgin, cause, Pointer(this));
+ XactionRep *rep = new XactionRep(virgin, cause, Pointer(this));
XactionRep::AdapterXaction x(theService->makeXaction(rep));
rep->master(x);
return rep;
virtual bool probed() const;
virtual bool up() const;
- Adaptation::Initiate *makeXactLauncher(Adaptation::Initiator *, HttpMsg *virginHeader, HttpRequest *virginCause);
+ Adaptation::Initiate *makeXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause);
// the methods below can only be called on an up() service
virtual bool wantsUrl(const String &urlPath) const;
CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
-Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
- HttpMsg *virginHeader, HttpRequest *virginCause,
- const Adaptation::ServicePointer &aService):
+Adaptation::Ecap::XactionRep::XactionRep(
+ HttpMsg *virginHeader, HttpRequest *virginCause,
+ const Adaptation::ServicePointer &aService):
AsyncJob("Adaptation::Ecap::XactionRep"),
- Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator),
+ Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
theService(aService),
theVirginRep(virginHeader), theCauseRep(NULL),
proxyingVb(opUndecided), proxyingAb(opUndecided),
public BodyConsumer, public BodyProducer
{
public:
- XactionRep(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, const Adaptation::ServicePointer &service);
+ XactionRep(HttpMsg *virginHeader, HttpRequest *virginCause, const Adaptation::ServicePointer &service);
virtual ~XactionRep();
typedef libecap::shared_ptr<libecap::adapter::Xaction> AdapterXaction;
Adaptation::Icap::Launcher::Launcher(const char *aTypeName,
- Adaptation::Initiator *anInitiator, Adaptation::ServicePointer &aService):
+ Adaptation::ServicePointer &aService):
AsyncJob(aTypeName),
- Adaptation::Initiate(aTypeName, anInitiator),
+ Adaptation::Initiate(aTypeName),
theService(aService), theXaction(0), theLaunches(0)
{
}
{
Adaptation::Initiate::start();
- Must(theInitiator);
+ Must(theInitiator.set());
launchXaction("first");
}
if (theLaunches >= TheConfig.repeat_limit)
x->disableRepeats("over icap_retry_limit");
theXaction = initiateAdaptation(x);
- Must(theXaction);
+ Must(initiated(theXaction));
}
void Adaptation::Icap::Launcher::noteAdaptationAnswer(HttpMsg *message)
Must(done()); // swanSong will notify the initiator
}
-void Adaptation::Icap::Launcher::noteXactAbort(XactAbortInfo &info)
+void Adaptation::Icap::Launcher::noteXactAbort(XactAbortInfo info)
{
debugs(93,5, HERE << "theXaction:" << theXaction << " launches: " << theLaunches);
void Adaptation::Icap::Launcher::swanSong()
{
- if (theInitiator)
+ if (theInitiator.set())
tellQueryAborted(true); // always final here because abnormal
- if (theXaction)
+ if (theXaction.set())
clearAdaptation(theXaction);
Adaptation::Initiate::swanSong();
class Launcher: public Adaptation::Initiate, public Adaptation::Initiator
{
public:
- Launcher(const char *aTypeName, Adaptation::Initiator *anInitiator, Adaptation::ServicePointer &aService);
+ Launcher(const char *aTypeName, Adaptation::ServicePointer &aService);
virtual ~Launcher();
// Adaptation::Initiate: asynchronous communication with the initiator
// Adaptation::Initiator: asynchronous communication with the current transaction
virtual void noteAdaptationAnswer(HttpMsg *message);
- virtual void noteXactAbort(XactAbortInfo &info);
+ virtual void noteXactAbort(XactAbortInfo info);
private:
bool canRetry(XactAbortInfo &info) const; //< true if can retry in the case of persistent connection failures
void launchXaction(const char *xkind);
Adaptation::ServicePointer theService; ///< ICAP service for all launches
- Adaptation::Initiate *theXaction; ///< current ICAP transaction
+ CbcPointer<Initiate> theXaction; ///< current ICAP transaction
int theLaunches; // the number of transaction launches
};
XactAbortInfo(const XactAbortInfo &);
~XactAbortInfo();
+ std::ostream &print(std::ostream &os) const {
+ return os << isRetriable << ',' << isRepeatable;
+ }
+
HttpRequest *icapRequest;
HttpReply *icapReply;
bool isRetriable;
XactAbortInfo &operator =(const XactAbortInfo &); // undefined
};
-/* required by UnaryMemFunT */
-inline std::ostream &operator << (std::ostream &os, Adaptation::Icap::XactAbortInfo info)
+inline
+std::ostream &
+operator <<(std::ostream &os, const XactAbortInfo &xai)
{
- // Nothing, it is unused
- return os;
+ return xai.print(os);
}
-/// A Dialer class used to schedule the Adaptation::Icap::Launcher::noteXactAbort call
-class XactAbortCall: public UnaryMemFunT<Adaptation::Icap::Launcher, Adaptation::Icap::XactAbortInfo>
-{
-public:
- typedef void (Adaptation::Icap::Launcher::*DialMethod)(Adaptation::Icap::XactAbortInfo &);
- XactAbortCall(Adaptation::Icap::Launcher *launcer, DialMethod aMethod,
- const Adaptation::Icap::XactAbortInfo &info):
- UnaryMemFunT<Adaptation::Icap::Launcher, Adaptation::Icap::XactAbortInfo>(launcer, NULL, info),
- dialMethod(aMethod) {}
- virtual void print(std::ostream &os) const { os << '(' << "retriable:" << arg1.isRetriable << ", repeatable:" << arg1.isRepeatable << ')'; }
-
-public:
- DialMethod dialMethod;
-
-protected:
- virtual void doDial() { (object->*dialMethod)(arg1); }
-};
-
} // namespace Icap
} // namespace Adaptation
memset(this, 0, sizeof(*this));
}
-Adaptation::Icap::ModXact::ModXact(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader,
+Adaptation::Icap::ModXact::ModXact(HttpMsg *virginHeader,
HttpRequest *virginCause, Adaptation::Icap::ServiceRep::Pointer &aService):
AsyncJob("Adaptation::Icap::ModXact"),
- Adaptation::Icap::Xaction("Adaptation::Icap::ModXact", anInitiator, aService),
+ Adaptation::Icap::Xaction("Adaptation::Icap::ModXact", aService),
virginConsumed(0),
bodyParser(NULL),
canStartBypass(false), // too early
Must(!state.serviceWaiting);
debugs(93, 7, HERE << "will wait for the ICAP service" << status());
state.serviceWaiting = true;
- AsyncCall::Pointer call = asyncCall(93,5, "Adaptation::Icap::ModXact::noteServiceReady",
- MemFun(this, &Adaptation::Icap::ModXact::noteServiceReady));
+ typedef NullaryMemFunT<ModXact> Dialer;
+ AsyncCall::Pointer call = JobCallback(93,5,
+ Dialer, this, Adaptation::Icap::ModXact::noteServiceReady);
service().callWhenReady(call);
}
/* Adaptation::Icap::ModXactLauncher */
-Adaptation::Icap::ModXactLauncher::ModXactLauncher(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, Adaptation::ServicePointer aService):
+Adaptation::Icap::ModXactLauncher::ModXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause, Adaptation::ServicePointer aService):
AsyncJob("Adaptation::Icap::ModXactLauncher"),
- Adaptation::Icap::Launcher("Adaptation::Icap::ModXactLauncher", anInitiator, aService)
+ Adaptation::Icap::Launcher("Adaptation::Icap::ModXactLauncher", aService)
{
virgin.setHeader(virginHeader);
virgin.setCause(virginCause);
Adaptation::Icap::ServiceRep::Pointer s =
dynamic_cast<Adaptation::Icap::ServiceRep*>(theService.getRaw());
Must(s != NULL);
- return new Adaptation::Icap::ModXact(this, virgin.header, virgin.cause, s);
+ return new Adaptation::Icap::ModXact(virgin.header, virgin.cause, s);
}
void Adaptation::Icap::ModXactLauncher::swanSong()
{
public:
- ModXact(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ServiceRep::Pointer &s);
+ ModXact(HttpMsg *virginHeader, HttpRequest *virginCause, ServiceRep::Pointer &s);
// BodyProducer methods
virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer);
InOut virgin;
InOut adapted;
-protected:
// bypasses exceptions if needed and possible
virtual void callException(const std::exception &e);
class ModXactLauncher: public Launcher
{
public:
- ModXactLauncher(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, Adaptation::ServicePointer s);
+ ModXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause, Adaptation::ServicePointer s);
protected:
virtual Xaction *createXaction();
CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, OptXactLauncher);
-Adaptation::Icap::OptXact::OptXact(Adaptation::Initiator *anInitiator, Adaptation::Icap::ServiceRep::Pointer &aService):
+Adaptation::Icap::OptXact::OptXact(Adaptation::Icap::ServiceRep::Pointer &aService):
AsyncJob("Adaptation::Icap::OptXact"),
- Adaptation::Icap::Xaction("Adaptation::Icap::OptXact", anInitiator, aService)
+ Adaptation::Icap::Xaction("Adaptation::Icap::OptXact", aService)
{
}
/* Adaptation::Icap::OptXactLauncher */
-Adaptation::Icap::OptXactLauncher::OptXactLauncher(Adaptation::Initiator *anInitiator, Adaptation::ServicePointer aService):
+Adaptation::Icap::OptXactLauncher::OptXactLauncher(Adaptation::ServicePointer aService):
AsyncJob("Adaptation::Icap::OptXactLauncher"),
- Adaptation::Icap::Launcher("Adaptation::Icap::OptXactLauncher", anInitiator, aService)
+ Adaptation::Icap::Launcher("Adaptation::Icap::OptXactLauncher", aService)
{
}
Adaptation::Icap::ServiceRep::Pointer s =
dynamic_cast<Adaptation::Icap::ServiceRep*>(theService.getRaw());
Must(s != NULL);
- return new Adaptation::Icap::OptXact(this, s);
+ return new Adaptation::Icap::OptXact(s);
}
{
public:
- OptXact(Adaptation::Initiator *anInitiator, ServiceRep::Pointer &aService);
+ OptXact(ServiceRep::Pointer &aService);
protected:
virtual void start();
class OptXactLauncher: public Launcher
{
public:
- OptXactLauncher(Adaptation::Initiator *anInitiator, Adaptation::ServicePointer aService);
+ OptXactLauncher(Adaptation::ServicePointer aService);
protected:
virtual Xaction *createXaction();
if (!detached())
updateScheduled = false;
- if (detached() || theOptionsFetcher) {
+ if (detached() || theOptionsFetcher.set()) {
debugs(93,5, HERE << "ignores options update " << status());
return;
}
i.callback = cb;
theClients.push_back(i);
- if (theOptionsFetcher || notifying)
+ if (theOptionsFetcher.set() || notifying)
return; // do nothing, we will be picked up in noteTimeToNotify()
if (needNewOptions())
void Adaptation::Icap::ServiceRep::scheduleNotification()
{
debugs(93,7, HERE << "will notify " << theClients.size() << " clients");
- CallJobHere(93, 5, this, Adaptation::Icap::ServiceRep::noteTimeToNotify);
+ CallJobHere(93, 5, this, Adaptation::Icap::ServiceRep, noteTimeToNotify);
}
bool Adaptation::Icap::ServiceRep::needNewOptions() const
// we are receiving ICAP OPTIONS response headers here or NULL on failures
void Adaptation::Icap::ServiceRep::noteAdaptationAnswer(HttpMsg *msg)
{
- Must(theOptionsFetcher);
+ Must(initiated(theOptionsFetcher));
clearAdaptation(theOptionsFetcher);
Must(msg);
void Adaptation::Icap::ServiceRep::noteAdaptationQueryAbort(bool)
{
- Must(theOptionsFetcher);
+ Must(initiated(theOptionsFetcher));
clearAdaptation(theOptionsFetcher);
debugs(93,3, HERE << "failed to fetch options " << status());
handleNewOptions(0);
}
+// we (a) must keep trying to get OPTIONS and (b) are RefCounted so we
+// must keep our job alive (XXX: until nobody needs us)
+void Adaptation::Icap::ServiceRep::callException(const std::exception &e)
+{
+ clearAdaptation(theOptionsFetcher);
+ debugs(93,2, "ICAP probably failed to fetch options (" << e.what() <<
+ ")" << status());
+ handleNewOptions(0);
+}
+
void Adaptation::Icap::ServiceRep::handleNewOptions(Adaptation::Icap::Options *newOptions)
{
// new options may be NULL
Must(!theOptionsFetcher);
debugs(93,6, HERE << "will get new options " << status());
- // XXX: second "this" is "self"; this works but may stop if API changes
- theOptionsFetcher = initiateAdaptation(new Adaptation::Icap::OptXactLauncher(this, this));
- Must(theOptionsFetcher);
+ // XXX: "this" here is "self"; works until refcounting API changes
+ theOptionsFetcher = initiateAdaptation(
+ new Adaptation::Icap::OptXactLauncher(this));
// TODO: timeout in case Adaptation::Icap::OptXact never calls us back?
// Such a timeout should probably be a generic AsyncStart feature.
}
}
Adaptation::Initiate *
-Adaptation::Icap::ServiceRep::makeXactLauncher(Adaptation::Initiator *initiator,
- HttpMsg *virgin, HttpRequest *cause)
+Adaptation::Icap::ServiceRep::makeXactLauncher(HttpMsg *virgin,
+ HttpRequest *cause)
{
- return new Adaptation::Icap::ModXactLauncher(initiator, virgin, cause, this);
+ return new Adaptation::Icap::ModXactLauncher(virgin, cause, this);
}
// returns a temporary string depicting service status, for debugging
if (detached())
buf.append(",detached", 9);
- if (theOptionsFetcher)
+ if (theOptionsFetcher.set())
buf.append(",fetch", 6);
if (notifying)
virtual bool probed() const; // see comments above
virtual bool up() const; // see comments above
- virtual Adaptation::Initiate *makeXactLauncher(Adaptation::Initiator *, HttpMsg *virginHeader, HttpRequest *virginCause);
+ virtual Initiate *makeXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause);
void callWhenReady(AsyncCall::Pointer &cb);
//AsyncJob virtual methods
virtual bool doneAll() const { return Adaptation::Initiator::doneAll() && false;}
+ virtual void callException(const std::exception &e);
virtual void detach();
virtual bool detached() const;
Clients theClients; // all clients waiting for a call back
Options *theOptions;
- Adaptation::Initiate *theOptionsFetcher; // pending ICAP OPTIONS transaction
+ CbcPointer<Adaptation::Initiate> theOptionsFetcher; // pending ICAP OPTIONS transaction
time_t theLastUpdate; // time the options were last updated
static const int TheSessionFailureLimit;
//CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
-Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Initiator *anInitiator, Adaptation::Icap::ServiceRep::Pointer &aService):
+Adaptation::Icap::Xaction::Xaction(const char *aTypeName,
+ Adaptation::Icap::ServiceRep::Pointer &aService):
AsyncJob(aTypeName),
- Adaptation::Initiate(aTypeName, anInitiator),
+ Adaptation::Initiate(aTypeName),
icapRequest(NULL),
icapReply(NULL),
attempts(0),
// fake the connect callback
// TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
- Dialer dialer(this, &Adaptation::Icap::Xaction::noteCommConnected);
+ CbcPointer<Xaction> self(this);
+ Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
dialer.params.fd = connection;
dialer.params.flag = COMM_OK;
// fake other parameters by copying from the existing connection
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
-
+ AsyncCall::Pointer timeoutCall = JobCallback(93, 5,
+ TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
commSetTimeout(connection, TheConfig.connect_timeout(
service().cfg().bypass), timeoutCall);
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+ closer = JobCallback(93, 5,
+ CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed);
comm_add_close_handler(connection, closer);
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
- ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected));
+ connector = JobCallback(93,3,
+ ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
}
{
// comm module will free the buffer
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
- writer = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommWrote",
- Dialer(this, &Adaptation::Icap::Xaction::noteCommWrote));
+ writer = JobCallback(93,3,
+ Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
comm_write_mbuf(connection, &buf, writer);
updateTimeout();
// XXX: why does Config.Timeout lacks a write timeout?
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
+ AsyncCall::Pointer call = JobCallback(93,5,
+ TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
commSetTimeout(connection,
TheConfig.io_timeout(service().cfg().bypass), call);
* here instead of reading directly into readBuf.buf.
*/
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
- reader = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommRead",
- Dialer(this, &Adaptation::Icap::Xaction::noteCommRead));
+ reader = JobCallback(93,3,
+ Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
comm_read(connection, commBuf, readBuf.spaceSize(), reader);
updateTimeout();
void Adaptation::Icap::Xaction::noteInitiatorAborted()
{
- if (theInitiator) {
+ if (theInitiator.set()) {
clearInitiator();
mustStop("initiator aborted");
}
if (commBuf)
memFreeBuf(commBufSize, commBuf);
- if (theInitiator)
- tellQueryAborted();
+ tellQueryAborted();
maybeLog();
void Adaptation::Icap::Xaction::tellQueryAborted()
{
- Adaptation::Icap::Launcher *l = dynamic_cast<Adaptation::Icap::Launcher*>(theInitiator.ptr());
- Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply, retriable(), repeatable());
- CallJob(91, 5, __FILE__, __LINE__,
- "Adaptation::Icap::Launcher::noteXactAbort",
- XactAbortCall(l, &Adaptation::Icap::Launcher::noteXactAbort, abortInfo) );
- clearInitiator();
+ if (theInitiator.set()) {
+ Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply,
+ retriable(), repeatable());
+ Launcher *launcher = dynamic_cast<Launcher*>(theInitiator.get());
+ // launcher may be nil if initiator is invalid
+ CallJobHere1(91,5, CbcPointer<Launcher>(launcher),
+ Launcher, noteXactAbort, abortInfo);
+ clearInitiator();
+ }
}
{
public:
- Xaction(const char *aTypeName, Adaptation::Initiator *anInitiator, ServiceRep::Pointer &aService);
+ Xaction(const char *aTypeName, ServiceRep::Pointer &aService);
virtual ~Xaction();
void disableRetries();
// useful for debugging
virtual bool fillVirginHttpHeader(MemBuf&) const;
+public:
// custom exception handling and end-of-call checks
virtual void callException(const std::exception &e);
virtual void callEnd();
+protected:
// logging
void setOutcome(const XactOutcome &xo);
virtual void finalizeLogInfo();
*/
#include "squid.h"
+#include "base/AsyncJobCalls.h"
#include "cbdata.h"
#include "MemBuf.h"
#include "TextException.h"
unsigned int AsyncJob::TheLastId = 0;
-AsyncJob *AsyncJob::AsyncStart(AsyncJob *job)
+AsyncJob::Pointer AsyncJob::Start(AsyncJob *j)
{
- assert(job);
- CallJobHere(93, 5, job, AsyncJob::noteStart);
+ AsyncJob::Pointer job(j);
+ CallJobHere(93, 5, job, AsyncJob, start);
return job;
}
{
}
-void AsyncJob::noteStart()
-{
- start();
-}
-
void AsyncJob::start()
{
}
// there is no call wrapper waiting for our return, so we fake it
debugs(93, 5, typeName << " will delete this, reason: " << stopReason);
+ CbcPointer<AsyncJob> self(this);
AsyncCall::Pointer fakeCall = asyncCall(93,4, "FAKE-deleteThis",
- MemFun(this, &AsyncJob::deleteThis, aReason));
+ JobMemFun(self, &AsyncJob::deleteThis, aReason));
inCall = fakeCall;
callEnd();
// delete fakeCall;
}
-/* JobDialer */
-
-JobDialer::JobDialer(AsyncJob *aJob): job(NULL), lock(NULL)
-{
- if (aJob) {
- lock = cbdataReference(aJob->toCbdata());
- job = aJob;
- }
-}
-
-JobDialer::JobDialer(const JobDialer &d): CallDialer(d),
- job(NULL), lock(NULL)
-{
- if (d.lock && cbdataReferenceValid(d.lock)) {
- lock = cbdataReference(d.lock);
- Must(d.job);
- job = d.job;
- }
-}
-
-JobDialer::~JobDialer()
-{
- cbdataReferenceDone(lock); // lock may be NULL
-}
-
-
-bool
-JobDialer::canDial(AsyncCall &call)
-{
- if (!lock)
- return call.cancel("job was gone before the call");
-
- if (!cbdataReferenceValid(lock))
- return call.cancel("job gone after the call");
-
- Must(job);
- return job->canBeCalled(call);
-}
-
-void
-JobDialer::dial(AsyncCall &call)
-{
- Must(lock && cbdataReferenceValid(lock)); // canDial() checks for this
- Must(job);
-
- job->callStart(call);
-
- try {
- doDial();
- } catch (const std::exception &e) {
- debugs(call.debugSection, 3,
- HERE << call.name << " threw exception: " << e.what());
- job->callException(e);
- }
-
- job->callEnd(); // may delete job
-}
#include "base/AsyncCall.h"
#include "TextException.h"
+template <class Cbc>
+class CbcPointer;
+
/**
\defgroup AsyncJobAPI Async-Jobs API
\par
*/
/// \ingroup AsyncJobAPI
+/// Base class for all asynchronous jobs
class AsyncJob
{
-
public:
- static AsyncJob *AsyncStart(AsyncJob *job); // use this to start jobs
+ typedef CbcPointer<AsyncJob> Pointer;
+public:
AsyncJob(const char *aTypeName);
virtual ~AsyncJob();
virtual void *toCbdata() = 0;
- void noteStart(); // calls virtual start
+
+ /// starts a freshly created job (i.e., makes the job asynchronous)
+ static Pointer Start(AsyncJob *job);
protected:
// XXX: temporary method to replace "delete this" in jobs-in-transition.
static unsigned int TheLastId;
};
-
-/**
- \ingroup AsyncJobAPI
- * This is a base class for all job call dialers. It does all the job
- * dialing logic (debugging, handling exceptions, etc.) except for calling
- * the job method. The latter is not possible without templates and we
- * want to keep this class simple and template-free. Thus, we add a dial()
- * virtual method that the JobCallT template below will implement for us,
- * calling the job.
- */
-class JobDialer: public CallDialer
-{
-public:
- JobDialer(AsyncJob *aJob);
- JobDialer(const JobDialer &d);
- virtual ~JobDialer();
-
- virtual bool canDial(AsyncCall &call);
- void dial(AsyncCall &call);
-
- AsyncJob *job;
- void *lock; // job's cbdata
-
-protected:
- virtual void doDial() = 0; // actually calls the job method
-
-private:
- // not implemented and should not be needed
- JobDialer &operator =(const JobDialer &);
-};
-
-#include "base/AsyncJobCalls.h"
-
-template <class Dialer>
-bool
-CallJob(int debugSection, int debugLevel, const char *fileName, int fileLine,
- const char *callName, const Dialer &dialer)
-{
- AsyncCall::Pointer call = asyncCall(debugSection, debugLevel, callName, dialer);
- return ScheduleCall(fileName, fileLine, call);
-}
-
-
-#define CallJobHere(debugSection, debugLevel, job, method) \
- CallJob((debugSection), (debugLevel), __FILE__, __LINE__, #method, \
- MemFun((job), &method))
-
-#define CallJobHere1(debugSection, debugLevel, job, method, arg1) \
- CallJob((debugSection), (debugLevel), __FILE__, __LINE__, #method, \
- MemFun((job), &method, (arg1)))
-
-
#endif /* SQUID_ASYNC_JOB_H */
#define SQUID_ASYNCJOBCALLS_H
#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
+
+/**
+ \ingroup AsyncJobAPI
+ * This is a base class for all job call dialers. It does all the job
+ * dialing logic (debugging, handling exceptions, etc.) except for calling
+ * the job method. The latter requires knowing the number and type of method
+ * parameters. Thus, we add a dial() virtual method that the MemFunT templates
+ * below implement for us, calling the job's method with the right params.
+ */
+template <class Job>
+class JobDialer: public CallDialer
+{
+public:
+ typedef Job DestClass;
+ typedef CbcPointer<Job> JobPointer;
+
+ JobDialer(const JobPointer &aJob);
+ JobDialer(const JobDialer &d);
+
+ virtual bool canDial(AsyncCall &call);
+ void dial(AsyncCall &call);
+
+ JobPointer job;
+
+protected:
+ virtual void doDial() = 0; // actually calls the job method
+
+private:
+ // not implemented and should not be needed
+ JobDialer &operator =(const JobDialer &);
+};
+
+/// schedule an async job call using a dialer; use CallJobHere macros instead
+template <class Dialer>
+bool
+CallJob(int debugSection, int debugLevel, const char *fileName, int fileLine,
+ const char *callName, const Dialer &dialer)
+{
+ AsyncCall::Pointer call = asyncCall(debugSection, debugLevel, callName, dialer);
+ return ScheduleCall(fileName, fileLine, call);
+}
+
+
+#define CallJobHere(debugSection, debugLevel, job, Class, method) \
+ CallJob((debugSection), (debugLevel), __FILE__, __LINE__, \
+ (#Class "::" #method), \
+ JobMemFun<Class>((job), &Class::method))
+
+#define CallJobHere1(debugSection, debugLevel, job, Class, method, arg1) \
+ CallJob((debugSection), (debugLevel), __FILE__, __LINE__, \
+ (#Class "::" #method), \
+ JobMemFun<Class>((job), &Class::method, (arg1)))
+
+
+/// Convenience macro to create a Dialer-based job callback
+#define JobCallback(dbgSection, dbgLevel, Dialer, job, method) \
+ asyncCall((dbgSection), (dbgLevel), #method, \
+ Dialer(CbcPointer<Dialer::DestClass>(job), &method))
+
/*
* *MemFunT are member function (i.e., class method) wrappers. They store
// Arity names are from http://en.wikipedia.org/wiki/Arity
-template <class C>
-class NullaryMemFunT: public JobDialer
+template <class Job>
+class NullaryMemFunT: public JobDialer<Job>
{
public:
- typedef void (C::*Method)();
- explicit NullaryMemFunT(C *anObject, Method aMethod):
- JobDialer(anObject), object(anObject), method(aMethod) {}
+ typedef void (Job::*Method)();
+ explicit NullaryMemFunT(const CbcPointer<Job> &aJob, Method aMethod):
+ JobDialer<Job>(aJob), method(aMethod) {}
virtual void print(std::ostream &os) const { os << "()"; }
public:
- C *object;
Method method;
protected:
- virtual void doDial() { (object->*method)(); }
+ virtual void doDial() { ((&(*this->job))->*method)(); }
};
-template <class C, class Argument1>
-class UnaryMemFunT: public JobDialer
+template <class Job, class Argument1>
+class UnaryMemFunT: public JobDialer<Job>
{
public:
- typedef void (C::*Method)(Argument1);
- explicit UnaryMemFunT(C *anObject, Method aMethod, const Argument1 &anArg1):
- JobDialer(anObject),
- object(anObject), method(aMethod), arg1(anArg1) {}
+ typedef void (Job::*Method)(Argument1);
+ explicit UnaryMemFunT(const CbcPointer<Job> &aJob, Method aMethod,
+ const Argument1 &anArg1): JobDialer<Job>(aJob),
+ method(aMethod), arg1(anArg1) {}
virtual void print(std::ostream &os) const { os << '(' << arg1 << ')'; }
public:
- C *object;
Method method;
Argument1 arg1;
protected:
- virtual void doDial() { (object->*method)(arg1); }
+ virtual void doDial() { ((&(*this->job))->*method)(arg1); }
};
// ... add more as needed
template <class C>
NullaryMemFunT<C>
-MemFun(C *object, typename NullaryMemFunT<C>::Method method)
+JobMemFun(const CbcPointer<C> &job, typename NullaryMemFunT<C>::Method method)
{
- return NullaryMemFunT<C>(object, method);
+ return NullaryMemFunT<C>(job, method);
}
template <class C, class Argument1>
UnaryMemFunT<C, Argument1>
-MemFun(C *object, typename UnaryMemFunT<C, Argument1>::Method method,
- Argument1 arg1)
+JobMemFun(const CbcPointer<C> &job, typename UnaryMemFunT<C, Argument1>::Method method,
+ Argument1 arg1)
+{
+ return UnaryMemFunT<C, Argument1>(job, method, arg1);
+}
+
+
+// inlined methods
+
+template<class Job>
+JobDialer<Job>::JobDialer(const JobPointer &aJob): job(aJob)
+{
+}
+
+template<class Job>
+JobDialer<Job>::JobDialer(const JobDialer<Job> &d): CallDialer(d), job(d.job)
{
- return UnaryMemFunT<C, Argument1>(object, method, arg1);
+}
+
+template<class Job>
+bool
+JobDialer<Job>::canDial(AsyncCall &call)
+{
+ if (!job)
+ return call.cancel("job gone");
+
+ return job->canBeCalled(call);
+}
+
+template<class Job>
+void
+JobDialer<Job>::dial(AsyncCall &call)
+{
+ job->callStart(call);
+
+ try {
+ doDial();
+ } catch (const std::exception &e) {
+ debugs(call.debugSection, 3,
+ HERE << call.name << " threw exception: " << e.what());
+ job->callException(e);
+ }
+
+ job->callEnd(); // may delete job
}
#endif /* SQUID_ASYNCJOBCALLS_H */
--- /dev/null
+/*
+ * $Id$
+ */
+
+#ifndef SQUID_CBC_POINTER_H
+#define SQUID_CBC_POINTER_H
+
+#include "TextException.h"
+#include "cbdata.h"
+
+/**
+ \ingroup CBDATAAPI
+ *
+ * Safely points to a cbdata-protected class (cbc), such as an AsyncJob.
+ * When a cbc we communicate with disappears without
+ * notice or a notice has not reached us yet, this class prevents
+ * dereferencing the pointer to the gone cbc object.
+ */
+template<class Cbc>
+class CbcPointer
+{
+public:
+ CbcPointer(); // a nil pointer
+ CbcPointer(Cbc *aCbc);
+ CbcPointer(const CbcPointer &p);
+ ~CbcPointer();
+
+ Cbc *raw() const; ///< a temporary raw Cbc pointer; may be invalid
+ Cbc *get() const; ///< a temporary valid raw Cbc pointer or NULL
+ Cbc &operator *() const; ///< a valid Cbc reference or exception
+ Cbc *operator ->() const; ///< a valid Cbc pointer or exception
+
+ // no bool operator because set() != valid()
+ bool set() const { return cbc != NULL; } ///< was set but may be invalid
+ Cbc *valid() const { return get(); } ///< was set and is valid
+ bool operator !() const { return !valid(); } ///< invalid or was not set
+ bool operator ==(const CbcPointer<Cbc> &o) const { return lock == o.lock; }
+
+ CbcPointer &operator =(const CbcPointer &p);
+
+ /// support converting a child cbc pointer into a parent cbc pointer
+ template <typename Other>
+ CbcPointer(const CbcPointer<Other> &o): cbc(o.raw()), lock(NULL) {
+ if (o.valid())
+ lock = cbdataReference(o->toCbdata());
+ }
+
+ /// support assigning a child cbc pointer to a parent cbc pointer
+ template <typename Other>
+ CbcPointer &operator =(const CbcPointer<Other> &o) {
+ clear();
+ cbc = o.raw(); // so that set() is accurate
+ if (o.valid())
+ lock = cbdataReference(o->toCbdata());
+ return *this;
+ }
+
+ void clear(); ///< make pointer not set; does not invalidate cbdata
+
+ std::ostream &print(std::ostream &os) const;
+
+private:
+ Cbc *cbc; // a possibly invalid pointer to a cbdata class
+ void *lock; // a valid pointer to cbc's cbdata or nil
+};
+
+template <class Cbc>
+inline
+std::ostream &operator <<(std::ostream &os, const CbcPointer<Cbc> &p)
+{
+ return p.print(os);
+}
+
+// inlined methods
+
+template<class Cbc>
+CbcPointer<Cbc>::CbcPointer(): cbc(NULL), lock(NULL)
+{
+}
+
+template<class Cbc>
+CbcPointer<Cbc>::CbcPointer(Cbc *aCbc): cbc(aCbc), lock(NULL)
+{
+ if (cbc)
+ lock = cbdataReference(cbc->toCbdata());
+}
+
+template<class Cbc>
+CbcPointer<Cbc>::CbcPointer(const CbcPointer &d): cbc(d.cbc), lock(NULL)
+{
+ if (d.lock && cbdataReferenceValid(d.lock))
+ lock = cbdataReference(d.lock);
+}
+
+template<class Cbc>
+CbcPointer<Cbc>::~CbcPointer()
+{
+ clear();
+}
+
+template<class Cbc>
+CbcPointer<Cbc> &CbcPointer<Cbc>::operator =(const CbcPointer &d)
+{
+ clear();
+ cbc = d.cbc;
+ if (d.lock && cbdataReferenceValid(d.lock))
+ lock = cbdataReference(d.lock);
+ return *this;
+}
+
+template<class Cbc>
+void
+CbcPointer<Cbc>::clear()
+{
+ cbdataReferenceDone(lock); // lock may be nil before and will be nil after
+ cbc = NULL;
+}
+
+template<class Cbc>
+Cbc *
+CbcPointer<Cbc>::raw() const
+{
+ return cbc;
+}
+
+template<class Cbc>
+Cbc *
+CbcPointer<Cbc>::get() const
+{
+ return (lock && cbdataReferenceValid(lock)) ? cbc : NULL;
+}
+
+template<class Cbc>
+Cbc &
+CbcPointer<Cbc>::operator *() const
+{
+ Cbc *c = get();
+ Must(c);
+ return *c;
+}
+
+template<class Cbc>
+Cbc *
+CbcPointer<Cbc>::operator ->() const
+{
+ Cbc *c = get();
+ Must(c);
+ return c;
+}
+
+template <class Cbc>
+std::ostream &CbcPointer<Cbc>::print(std::ostream &os) const
+{
+ return os << cbc << '/' << lock;
+}
+
+
+#endif /* SQUID_CBC_POINTER_H */
AsyncJob.cc \
AsyncJobCalls.h \
AsyncCallQueue.cc \
- AsyncCallQueue.h
+ AsyncCallQueue.h \
+ CbcPointer.h
makeSpaceAvailable();
typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
- reader = asyncCall(33, 5, "ConnStateData::clientReadRequest",
- Dialer(this, &ConnStateData::clientReadRequest));
+ reader = JobCallback(33, 5,
+ Dialer, this, ConnStateData::clientReadRequest);
comm_read(fd, in.addressToReadInto(), getAvailableBufferLength(), reader);
}
* Set the timeout BEFORE calling clientReadRequest().
*/
typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(33, 5, "ConnStateData::requestTimeout",
- TimeoutDialer(this, &ConnStateData::requestTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(33, 5,
+ TimeoutDialer, this, ConnStateData::requestTimeout);
commSetTimeout(fd, Config.Timeout.persistent_request, timeoutCall);
readSomeData();
* if we don't close() here, we still need a timeout handler!
*/
typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(33, 5, "ConnStateData::requestTimeout",
- TimeoutDialer(this,&ConnStateData::requestTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(33, 5,
+ TimeoutDialer, this, ConnStateData::requestTimeout);
commSetTimeout(io.fd, 30, timeoutCall);
/*
connState = connStateCreate(&details->peer, &details->me, newfd, s);
typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
- AsyncCall::Pointer call = asyncCall(33, 5, "ConnStateData::connStateClosed",
- Dialer(connState, &ConnStateData::connStateClosed));
+ AsyncCall::Pointer call = JobCallback(33, 5,
+ Dialer, connState, ConnStateData::connStateClosed);
comm_add_close_handler(newfd, call);
if (Config.onoff.log_fqdn)
fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS);
typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(33, 5, "ConnStateData::requestTimeout",
- TimeoutDialer(connState,&ConnStateData::requestTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(33, 5,
+ TimeoutDialer, connState, ConnStateData::requestTimeout);
commSetTimeout(newfd, Config.Timeout.read, timeoutCall);
#if USE_IDENT
ConnStateData *connState = connStateCreate(details->peer, details->me,
newfd, &s->http);
typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
- AsyncCall::Pointer call = asyncCall(33, 5, "ConnStateData::connStateClosed",
- Dialer(connState, &ConnStateData::connStateClosed));
+ AsyncCall::Pointer call = JobCallback(33, 5,
+ Dialer, connState, ConnStateData::connStateClosed);
comm_add_close_handler(newfd, call);
if (Config.onoff.log_fqdn)
fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS);
typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(33, 5, "ConnStateData::requestTimeout",
- TimeoutDialer(connState,&ConnStateData::requestTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(33, 5,
+ TimeoutDialer, connState, ConnStateData::requestTimeout);
commSetTimeout(newfd, Config.Timeout.request, timeoutCall);
#if USE_IDENT
fd_note(pinning_fd, desc);
typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
- pinning.closeHandler = asyncCall(33, 5, "ConnStateData::clientPinnedConnectionClosed",
- Dialer(this, &ConnStateData::clientPinnedConnectionClosed));
+ pinning.closeHandler = JobCallback(33, 5,
+ Dialer, this, ConnStateData::clientPinnedConnectionClosed);
comm_add_close_handler(pinning_fd, pinning.closeHandler);
}
assert(!virginHeadSource);
assert(!adaptedBodySource);
virginHeadSource = initiateAdaptation(
- new Adaptation::Iterator(this, request, NULL, g));
+ new Adaptation::Iterator(request, NULL, g));
// we could try to guess whether we can bypass this adaptation
// initiation failure, but it should not really happen
- assert(virginHeadSource != NULL); // Must, really
+ Must(initiated(virginHeadSource));
}
void
void endRequestSatisfaction();
private:
- Adaptation::Initiate *virginHeadSource;
+ CbcPointer<Adaptation::Initiate> virginHeadSource;
BodyPipe::Pointer adaptedBodySource;
bool request_satisfaction_mode;
flags.rest_supported = 1;
typedef CommCbMemFunT<FtpStateData, CommCloseCbParams> Dialer;
- AsyncCall::Pointer closer = asyncCall(9, 5, "FtpStateData::ctrlClosed",
- Dialer(this, &FtpStateData::ctrlClosed));
+ AsyncCall::Pointer closer = JobCallback(9, 5,
+ Dialer, this, FtpStateData::ctrlClosed);
ctrl.opened(theFwdState->server_fd, closer);
if (request->method == METHOD_PUT)
data.read_pending = true;
typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout",
- TimeoutDialer(this,&FtpStateData::ftpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
+ TimeoutDialer, this, FtpStateData::ftpTimeout);
commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
debugs(9,5,HERE << "queueing read on FD " << data.fd);
typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
entry->delayAwareRead(data.fd, data.readBuf->space(), read_sz,
- asyncCall(9, 5, "FtpStateData::dataRead",
- Dialer(this, &FtpStateData::dataRead)));
+ JobCallback(9, 5, Dialer, this, FtpStateData::dataRead));
}
void
if (ignoreErrno(io.xerrno)) {
typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout",
- TimeoutDialer(this,&FtpStateData::ftpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
+ TimeoutDialer, this, FtpStateData::ftpTimeout);
commSetTimeout(io.fd, Config.Timeout.read, timeoutCall);
maybeReadVirginBody();
}
typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
- AsyncCall::Pointer call = asyncCall(9, 5, "FtpStateData::ftpWriteCommandCallback",
- Dialer(this, &FtpStateData::ftpWriteCommandCallback));
+ AsyncCall::Pointer call = JobCallback(9, 5,
+ Dialer, this, FtpStateData::ftpWriteCommandCallback);
comm_write(ctrl.fd,
ctrl.last_command,
strlen(ctrl.last_command),
} else {
/* XXX What about Config.Timeout.read? */
typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
- AsyncCall::Pointer reader=asyncCall(9, 5, "FtpStateData::ftpReadControlReply",
- Dialer(this, &FtpStateData::ftpReadControlReply));
+ AsyncCall::Pointer reader = JobCallback(9, 5,
+ Dialer, this, FtpStateData::ftpReadControlReply);
comm_read(ctrl.fd, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, reader);
/*
* Cancel the timeout on the Data socket (if any) and
}
typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout",
- TimeoutDialer(this,&FtpStateData::ftpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
+ TimeoutDialer, this, FtpStateData::ftpTimeout);
commSetTimeout(ctrl.fd, Config.Timeout.read, timeoutCall);
}
* dont acknowledge PASV commands.
*/
typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout",
- TimeoutDialer(ftpState,&FtpStateData::ftpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
+ TimeoutDialer, ftpState, FtpStateData::ftpTimeout);
commSetTimeout(ftpState->data.fd, 15, timeoutCall);
}
comm_close(io.nfd);
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
- acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
+ AsyncCall::Pointer acceptCall = JobCallback(11, 5,
+ acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
comm_accept(data.fd, acceptCall);
return;
}
commSetTimeout(ctrl.fd, -1, nullCall);
typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout",
- TimeoutDialer(this,&FtpStateData::ftpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
+ TimeoutDialer, this, FtpStateData::ftpTimeout);
commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
/*\todo XXX We should have a flag to track connect state...
commSetTimeout(ctrl.fd, -1, nullCall);
typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout",
- TimeoutDialer(this,&FtpStateData::ftpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
+ TimeoutDialer, this, FtpStateData::ftpTimeout);
commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
* When client code is 150 with a hostname, Accept data channel. */
debugs(9, 3, "ftpReadStor: accepting data channel");
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
- acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
+ AsyncCall::Pointer acceptCall = JobCallback(11, 5,
+ acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
comm_accept(data.fd, acceptCall);
} else {
} else if (code == 150) {
/* Accept data channel */
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
- acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
+ AsyncCall::Pointer acceptCall = JobCallback(11, 5,
+ acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
comm_accept(ftpState->data.fd, acceptCall);
/*
commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout",
- TimeoutDialer(ftpState,&FtpStateData::ftpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
+ TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
return;
} else if (!ftpState->flags.tried_nlst && code > 300) {
} else if (code == 150) {
/* Accept data channel */
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
- acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
+ AsyncCall::Pointer acceptCall = JobCallback(11, 5,
+ acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
comm_accept(ftpState->data.fd, acceptCall);
/*
* Cancel the timeout on the Control socket and establish one
commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(9, 5, "FtpStateData::ftpTimeout",
- TimeoutDialer(ftpState,&FtpStateData::ftpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
+ TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
} else if (code >= 300) {
if (!ftpState->flags.try_slash_hack) {
FtpStateData::dataCloser()
{
typedef CommCbMemFunT<FtpStateData, CommCloseCbParams> Dialer;
- return asyncCall(9, 5, "FtpStateData::dataClosed",
- Dialer(this, &FtpStateData::dataClosed));
+ return JobCallback(9, 5, Dialer, this, FtpStateData::dataClosed);
}
/// configures the channel with a descriptor and registers a close handler
* register the handler to free HTTP state data when the FD closes
*/
typedef CommCbMemFunT<HttpStateData, CommCloseCbParams> Dialer;
- closeHandler = asyncCall(9, 5, "httpStateData::httpStateConnClosed",
- Dialer(this,&HttpStateData::httpStateConnClosed));
+ closeHandler = JobCallback(9, 5,
+ Dialer, this, HttpStateData::httpStateConnClosed);
comm_add_close_handler(fd, closeHandler);
}
flags.do_next_read = 0;
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
entry->delayAwareRead(fd, readBuf->space(read_size), read_size,
- asyncCall(11, 5, "HttpStateData::readReply",
- Dialer(this, &HttpStateData::readReply)));
+ JobCallback(11, 5, Dialer, this, HttpStateData::readReply));
}
}
* request bodies.
*/
typedef CommCbMemFunT<HttpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(11, 5, "HttpStateData::httpTimeout",
- TimeoutDialer(this,&HttpStateData::httpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(11, 5,
+ TimeoutDialer, this, HttpStateData::httpTimeout);
commSetTimeout(fd, Config.Timeout.read, timeoutCall);
}
typedef CommCbMemFunT<HttpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(11, 5, "HttpStateData::httpTimeout",
- TimeoutDialer(this,&HttpStateData::httpTimeout));
+ AsyncCall::Pointer timeoutCall = JobCallback(11, 5,
+ TimeoutDialer, this, HttpStateData::httpTimeout);
commSetTimeout(fd, Config.Timeout.lifetime, timeoutCall);
flags.do_next_read = 1;
maybeReadVirginBody();
if (!startRequestBodyFlow()) // register to receive body data
return false;
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
- Dialer dialer(this, &HttpStateData::sentRequestBody);
- requestSender = asyncCall(11,5, "HttpStateData::sentRequestBody", dialer);
+ requestSender = JobCallback(11,5,
+ Dialer, this, HttpStateData::sentRequestBody);
} else {
assert(!requestBodySource);
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
- Dialer dialer(this, &HttpStateData::sendComplete);
- requestSender = asyncCall(11,5, "HttpStateData::SendComplete", dialer);
+ requestSender = JobCallback(11,5,
+ Dialer, this, HttpStateData::sendComplete);
}
if (_peer != NULL) {
}
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
- Dialer dialer(this, &HttpStateData::sendComplete);
- AsyncCall::Pointer call= asyncCall(11,5, "HttpStateData::SendComplete", dialer);
+ AsyncCall::Pointer call = JobCallback(11,5,
+ Dialer, this, HttpStateData::sendComplete);
comm_write(fd, "\r\n", 2, call);
}
return;