/*
- * $Id: ICAPConfig.cc,v 1.20 2007/09/27 15:31:15 rousskov Exp $
+ * $Id: ICAPConfig.cc,v 1.21 2008/02/12 23:12:45 rousskov Exp $
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
* ----------------------------------------------------------
HttpRequest *aReq,
HttpReply *aRep,
ICAPAccessCheckCallback *aCallback,
- void *aCallbackData)
+ void *aCallbackData): AsyncJob("ICAPAccessCheck"), done(FALSE)
{
method = aMethod;
point = aPoint;
/*
* We use an event here to break deep function call sequences
*/
- eventAdd("ICAPAccessCheckCallbackEvent",
- ICAPAccessCheckCallbackEvent,
- ac,
- 0.0,
- 0,
- true);
+ CallJobHere(93, 5, ac, ICAPAccessCheck::do_callback);
}
+#if 0
void
ICAPAccessCheck::ICAPAccessCheckCallbackEvent(void *data)
{
ac->do_callback();
delete ac;
}
+#endif
void
ICAPAccessCheck::do_callback()
}
callback(service, validated_cbdata);
+ done = TRUE;
}
ICAPServiceRep::Pointer
/*
- * $Id: ICAPConfig.h,v 1.16 2007/06/28 15:28:59 rousskov Exp $
+ * $Id: ICAPConfig.h,v 1.17 2008/02/12 23:12:45 rousskov Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
#define SQUID_ICAPCONFIG_H
#include "event.h"
+#include "AsyncCall.h"
#include "ICAPServiceRep.h"
class acl_access;
int prepare();
};
-class ICAPAccessCheck
+class ICAPAccessCheck: public virtual AsyncJob
{
public:
String matchedClass;
void do_callback();
ICAPServiceRep::Pointer findBestService(ICAPClass *c, bool preferUp);
+ bool done;
public:
void check();
void checkCandidates();
static void ICAPAccessCheckCallbackWrapper(int, void*);
+#if 0
static EVH ICAPAccessCheckCallbackEvent;
+#endif
+//AsyncJob virtual methods
+ virtual bool doneAll() const { return AsyncJob::doneAll() && done;}
private:
CBDATA_CLASS2(ICAPAccessCheck);
#include "ICAPInitiator.h"
#include "ICAPInitiate.h"
-/* The call objects below are not cbdata-protected or refcounted because
- * nobody holds a pointer to them except for the event queue.
- *
- * The calls do check the Initiator pointer to see if that is still valid.
- *
- * TODO: convert this to a generic AsyncCall1 class
- * TODO: mempool kids of this class.
- */
-
-/* Event data and callback wrapper to call noteIcapAnswer with
- * the answer message as a parameter.
- */
-class ICAPAnswerCall {
-public:
- // use this function to make an asynchronous call
- static void Schedule(const ICAPInitiatorHolder &anInitiator, HttpMsg *aMessage);
-
- static void Wrapper(void *data);
-
-protected:
- ICAPAnswerCall(const ICAPInitiatorHolder &anInitiator, HttpMsg *aMessage);
- ~ICAPAnswerCall();
-
- void schedule();
- void call();
-
- ICAPInitiatorHolder theInitiator;
- HttpMsg *theMessage;
-};
-
-
-/* Event data and callback wrapper to call noteIcapQueryAbort with
- * the termination status as a parameter.
- *
- * XXX: This class is a clone of ICAPAnswerCall.
- */
-class ICAPQueryAbortCall {
+// ICAPInitiator::noteIcapAnswer Dialer locks/unlocks the message in transit
+// TODO: replace HTTPMSGLOCK with general RefCounting and delete this class
+class IcapAnswerDialer: public UnaryMemFunT<ICAPInitiator, HttpMsg*>
+{
public:
- // use this function to make an asynchronous call
- static void Schedule(const ICAPInitiatorHolder &anInitiator, bool beFinal);
-
- static void Wrapper(void *data);
-
-protected:
- ICAPQueryAbortCall(const ICAPInitiatorHolder &anInitiator, bool beFinal);
-
- void schedule();
- void call();
+ typedef UnaryMemFunT<ICAPInitiator, HttpMsg*> Parent;
- ICAPInitiatorHolder theInitiator;
- bool isFinal;
+ IcapAnswerDialer(ICAPInitiator *obj, Parent::Method meth, HttpMsg *msg):
+ Parent(obj, meth, msg) { HTTPMSGLOCK(arg1); }
+ IcapAnswerDialer(const IcapAnswerDialer &d):
+ Parent(d) { HTTPMSGLOCK(arg1); }
+ virtual ~IcapAnswerDialer() { HTTPMSGUNLOCK(arg1); }
};
void ICAPInitiate::sendAnswer(HttpMsg *msg)
{
- ICAPAnswerCall::Schedule(theInitiator, msg);
+ assert(msg);
+ CallJob(93, 5, __FILE__, __LINE__, "ICAPInitiator::noteIcapAnswer",
+ IcapAnswerDialer(theInitiator.ptr, &ICAPInitiator::noteIcapAnswer, msg));
clearInitiator();
}
+
void ICAPInitiate::tellQueryAborted(bool final)
{
- ICAPQueryAbortCall::Schedule(theInitiator, final);
+ CallJobHere1(93, 5, theInitiator.ptr, ICAPInitiator::noteIcapQueryAbort, final);
clearInitiator();
}
assert(false);
return *this;
}
-
-/* ICAPAnswerCall */
-
-ICAPAnswerCall::ICAPAnswerCall(const ICAPInitiatorHolder &anInitiator, HttpMsg *aMessage):
- theInitiator(anInitiator), theMessage(0)
-{
- if (theInitiator) {
- assert(aMessage);
- theMessage = HTTPMSGLOCK(aMessage);
- }
-}
-
-void ICAPAnswerCall::schedule()
-{
- if (theInitiator) {
- debugs(93,3, __FILE__ << "(" << __LINE__ << ") will call " <<
- theInitiator << "->ICAPInitiator::noteIcapAnswer(" <<
- theMessage << ")");
- eventAdd("ICAPInitiator::noteIcapAnswer",
- &ICAPAnswerCall::Wrapper, this, 0.0, 0, false);
- } else {
- debugs(93,3, __FILE__ << "(" << __LINE__ << ") will not call " <<
- theInitiator << "->ICAPInitiator::noteIcapAnswer(" <<
- theMessage << ")");
- }
-}
-
-ICAPAnswerCall::~ICAPAnswerCall()
-{
- if (theInitiator)
- HTTPMSGUNLOCK(theMessage);
-}
-
-void ICAPAnswerCall::Wrapper(void *data)
-{
- assert(data);
- ICAPAnswerCall *c = static_cast<ICAPAnswerCall*>(data);
- c->call();
- delete c;
-}
-
-void ICAPAnswerCall::call() {
- assert(theInitiator);
- if (cbdataReferenceValid(theInitiator.cbdata)) {
- debugs(93, 3, "entering " <<
- theInitiator << "->ICAPInitiator::noteIcapAnswer(" <<
- theMessage << ")");
- theInitiator.ptr->noteIcapAnswer(theMessage);
- debugs(93, 3, "exiting " <<
- theInitiator << "->ICAPInitiator::noteIcapAnswer(" <<
- theMessage << ")");
- } else {
- debugs(93, 3, "ignoring " <<
- theInitiator << "->ICAPInitiator::noteIcapAnswer(" <<
- theMessage << ")");
- }
-}
-
-void ICAPAnswerCall::Schedule(const ICAPInitiatorHolder &anInitiator, HttpMsg *aMessage)
-{
- ICAPAnswerCall *call = new ICAPAnswerCall(anInitiator, aMessage);
- call->schedule();
- // The call object is deleted in ICAPAnswerCall::Wrapper
-}
-
-
-/* ICAPQueryAbortCall */
-
-ICAPQueryAbortCall::ICAPQueryAbortCall(const ICAPInitiatorHolder &anInitiator, bool beFinal):
- theInitiator(anInitiator), isFinal(beFinal)
-{
-}
-
-void ICAPQueryAbortCall::schedule()
-{
- if (theInitiator) {
- debugs(93,3, __FILE__ << "(" << __LINE__ << ") will call " <<
- theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" <<
- isFinal << ")");
- eventAdd("ICAPInitiator::noteIcapQueryAbort",
- &ICAPQueryAbortCall::Wrapper, this, 0.0, 0, false);
- } else {
- debugs(93,3, __FILE__ << "(" << __LINE__ << ") will not call " <<
- theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" <<
- isFinal << ")");
- }
-}
-
-void ICAPQueryAbortCall::Wrapper(void *data)
-{
- assert(data);
- ICAPQueryAbortCall *c = static_cast<ICAPQueryAbortCall*>(data);
- c->call();
- delete c;
-}
-
-void ICAPQueryAbortCall::call() {
- assert(theInitiator);
- if (cbdataReferenceValid(theInitiator.cbdata)) {
- debugs(93, 3, "entering " <<
- theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" <<
- isFinal << ")");
- theInitiator.ptr->noteIcapQueryAbort(isFinal);
- debugs(93, 3, "exiting " <<
- theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" <<
- isFinal << ")");
- } else {
- debugs(93, 3, "ignoring " <<
- theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" <<
- isFinal << ")");
- }
-}
-
-void ICAPQueryAbortCall::Schedule(const ICAPInitiatorHolder &anInitiator, bool beFinal)
-{
- ICAPQueryAbortCall *call = new ICAPQueryAbortCall(anInitiator, beFinal);
- call->schedule();
- // The call object is deleted in ICAPQueryAbortCall::Wrapper
-}
/*
- * $Id: ICAPInitiate.h,v 1.1 2007/05/08 16:32:11 rousskov Exp $
+ * $Id: ICAPInitiate.h,v 1.2 2008/02/12 23:12:45 rousskov Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
#include "comm.h"
#include "MemBuf.h"
#include "ICAPServiceRep.h"
-#include "AsyncJob.h"
+#include "AsyncCall.h"
class HttpMsg;
class ICAPInitiator;
/* Initiator holder associtates an initiator with its cbdata. It is used as
* a temporary hack to make cbdata work with multiple inheritance */
+// TODO: since ICAPInitiator is now an AsyncJob, we do not need this class.
class ICAPInitiatorHolder {
public:
ICAPInitiatorHolder(ICAPInitiator *anInitiator);
*
* This class could have been named ICAPInitiatee.
*/
-class ICAPInitiate: public AsyncJob
+class ICAPInitiate: virtual public AsyncJob
{
public:
// communication with the initiator
virtual void noteInitiatorAborted() = 0;
- AsyncCallWrapper(93,3, ICAPInitiate, noteInitiatorAborted)
protected:
ICAPServiceRep &service();
#include "ICAPInitiator.h"
ICAPInitiate *ICAPInitiator::initiateIcap(ICAPInitiate *x) {
- x = cbdataReference(x);
- return dynamic_cast<ICAPInitiate*>(ICAPInitiate::AsyncStart(x));
+ if ((x = dynamic_cast<ICAPInitiate*>(ICAPInitiate::AsyncStart(x))))
+ x = cbdataReference(x);
+ return x;
}
void ICAPInitiator::clearIcap(ICAPInitiate *&x) {
void ICAPInitiator::announceInitiatorAbort(ICAPInitiate *&x)
{
if (x) {
- AsyncCall(93,5, x, ICAPInitiate::noteInitiatorAborted);
+ CallJobHere(93, 5, x, ICAPInitiate::noteInitiatorAborted);
clearIcap(x);
}
}
/*
- * $Id: ICAPInitiator.h,v 1.2 2007/05/08 16:32:11 rousskov Exp $
+ * $Id: ICAPInitiator.h,v 1.3 2008/02/12 23:12:45 rousskov Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
* or aborting an ICAP transaction.
*/
+#include "AsyncJob.h"
+
class HttpMsg;
class ICAPInitiate;
-class ICAPInitiator
+class ICAPInitiator: virtual public AsyncJob
{
public:
+ ICAPInitiator():AsyncJob("ICAPInitiator"){}
virtual ~ICAPInitiator() {}
// called when ICAP response headers are successfully interpreted
// the final parameter is set to disable bypass or retries
virtual void noteIcapQueryAbort(bool final) = 0;
- // a temporary cbdata-for-multiple inheritance hack, see ICAPInitiator.cc
- virtual void *toCbdata() { return this; }
-
protected:
ICAPInitiate *initiateIcap(ICAPInitiate *x); // locks and returns x
#include "ICAPXaction.h"
-ICAPLauncher::ICAPLauncher(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService):
+ICAPLauncher::ICAPLauncher(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService):AsyncJob(aTypeName),
ICAPInitiate(aTypeName, anInitiator, aService),
theXaction(0), theLaunches(0)
{
void ICAPLauncher::noteIcapAnswer(HttpMsg *message)
{
- AsyncCallEnter(noteIcapAnswer);
-
sendAnswer(message);
clearIcap(theXaction);
Must(done());
-
- AsyncCallExit();
+ debugs(93,3, HERE << "ICAPLauncher::noteIcapAnswer exiting ");
}
void ICAPLauncher::noteInitiatorAborted()
{
- AsyncCallEnter(noteInitiatorAborted);
announceInitiatorAbort(theXaction); // propogate to the transaction
clearInitiator();
Must(done()); // should be nothing else to do
- AsyncCallExit();
}
void ICAPLauncher::noteIcapQueryAbort(bool final)
{
- AsyncCallEnter(noteQueryAbort);
-
clearIcap(theXaction);
// TODO: add more checks from FwdState::checkRetry()?
Must(done()); // swanSong will notify the initiator
}
- AsyncCallExit();
}
bool ICAPLauncher::doneAll() const {
/*
- * $Id: ICAPLauncher.h,v 1.1 2007/05/08 16:32:11 rousskov Exp $
+ * $Id: ICAPLauncher.h,v 1.2 2008/02/12 23:12:45 rousskov Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
virtual void noteIcapAnswer(HttpMsg *message);
virtual void noteIcapQueryAbort(bool final);
- // a temporary cbdata-for-multiple inheritance hack, see ICAPInitiator.cc
- virtual void *toCbdata() { return this; }
-
protected:
// ICAPInitiate API implementation
virtual void start();
ICAPModXact::ICAPModXact(ICAPInitiator *anInitiator, HttpMsg *virginHeader,
HttpRequest *virginCause, ICAPServiceRep::Pointer &aService):
+ AsyncJob("ICAPModXact"),
ICAPXaction("ICAPModXact", anInitiator, aService),
icapReply(NULL),
virginConsumed(0),
// XXX: If commConnectStart in startWriting fails, we may get here
//_after_ the object got destroyed. Somebody please fix commConnectStart!
// TODO: Does re-entrance protection in callStart() solve the above?
-}
-
-static
-void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &)
-{
- ICAPModXact *x = static_cast<ICAPModXact*>(data);
- assert(x);
- x->noteServiceReady();
+ // TODO: Check that comm using AsyncCalls solves this problem.
}
void ICAPModXact::waitForService()
Must(!state.serviceWaiting);
debugs(93, 7, "ICAPModXact will wait for the ICAP service" << status());
state.serviceWaiting = true;
- service().callWhenReady(&ICAPModXact_noteServiceReady, this);
+ AsyncCall::Pointer call = asyncCall(93,5, "ICAPModXact::noteServiceReady",
+ MemFun(this, &ICAPModXact::noteServiceReady));
+ service().callWhenReady(call);
}
void ICAPModXact::noteServiceReady()
{
- ICAPXaction_Enter(noteServiceReady);
-
Must(state.serviceWaiting);
state.serviceWaiting = false;
disableRetries();
throw TexcHere("ICAP service is unusable");
}
-
- ICAPXaction_Exit();
}
void ICAPModXact::startWriting()
{
debugs(93, 5, HERE << "checking whether to write more" << status());
- if (writer) // already writing something
+ if (writer != NULL) // already writing something
return;
switch (state.writing) {
if (state.writing == State::writingReallyDone)
return;
- if (writer) {
+ if (writer != NULL) {
if (nicely) {
debugs(93, 7, HERE << "will wait for the last write" << status());
state.writing = State::writingAlmostDone; // may already be set
void ICAPModXact::readMore()
{
- if (reader || doneReading()) {
+ if (reader != NULL || doneReading()) {
debugs(93,3,HERE << "returning from readMore because reader or doneReading()");
return;
}
}
// HTTP side added virgin body data
-void ICAPModXact::noteMoreBodyDataAvailable(BodyPipe &)
+void ICAPModXact::noteMoreBodyDataAvailable(BodyPipe::Pointer)
{
- ICAPXaction_Enter(noteMoreBodyDataAvailable);
-
writeMore();
if (state.sending == State::sendingVirgin)
echoMore();
-
- ICAPXaction_Exit();
}
// HTTP side sent us all virgin info
-void ICAPModXact::noteBodyProductionEnded(BodyPipe &)
+void ICAPModXact::noteBodyProductionEnded(BodyPipe::Pointer)
{
- ICAPXaction_Enter(noteBodyProductionEnded);
-
Must(virgin.body_pipe->productionEnded());
// push writer and sender in case we were waiting for the last-chunk
if (state.sending == State::sendingVirgin)
echoMore();
-
- ICAPXaction_Exit();
}
// body producer aborted, but the initiator may still want to know
// the answer, even though the HTTP message has been truncated
-void ICAPModXact::noteBodyProducerAborted(BodyPipe &)
+void ICAPModXact::noteBodyProducerAborted(BodyPipe::Pointer)
{
- ICAPXaction_Enter(noteBodyProducerAborted);
-
Must(virgin.body_pipe->productionEnded());
// push writer and sender in case we were waiting for the last-chunk
if (state.sending == State::sendingVirgin)
echoMore();
-
- ICAPXaction_Exit();
}
// adapted body consumer wants more adapted data and
// possibly freed some buffer space
-void ICAPModXact::noteMoreBodySpaceAvailable(BodyPipe &)
+void ICAPModXact::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
{
- ICAPXaction_Enter(noteMoreBodySpaceAvailable);
-
if (state.sending == State::sendingVirgin)
echoMore();
else if (state.sending == State::sendingAdapted)
parseMore();
else
Must(state.sending == State::sendingUndecided);
-
- ICAPXaction_Exit();
}
// adapted body consumer aborted
-void ICAPModXact::noteBodyConsumerAborted(BodyPipe &)
+void ICAPModXact::noteBodyConsumerAborted(BodyPipe::Pointer)
{
- ICAPXaction_Enter(noteBodyConsumerAborted);
-
mustStop("adapted body consumer aborted");
-
- ICAPXaction_Exit();
}
// internal cleanup
/* ICAPModXactLauncher */
ICAPModXactLauncher::ICAPModXactLauncher(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &aService):
+ AsyncJob("ICAPModXactLauncher"),
ICAPLauncher("ICAPModXactLauncher", anInitiator, aService)
{
virgin.setHeader(virginHeader);
/*
- * $Id: ICAPModXact.h,v 1.10 2007/08/13 17:20:53 hno Exp $
+ * $Id: ICAPModXact.h,v 1.11 2008/02/12 23:12:45 rousskov Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
ICAPModXact(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &s);
// BodyProducer methods
- virtual void noteMoreBodySpaceAvailable(BodyPipe &);
- virtual void noteBodyConsumerAborted(BodyPipe &);
+ virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer);
+ virtual void noteBodyConsumerAborted(BodyPipe::Pointer);
// BodyConsumer methods
- virtual void noteMoreBodyDataAvailable(BodyPipe &);
- virtual void noteBodyProductionEnded(BodyPipe &);
- virtual void noteBodyProducerAborted(BodyPipe &);
+ virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer);
+ virtual void noteBodyProductionEnded(BodyPipe::Pointer);
+ virtual void noteBodyProducerAborted(BodyPipe::Pointer);
// comm handlers
virtual void handleCommConnected();
ICAPOptXact::ICAPOptXact(ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService):
+ AsyncJob("ICAPOptXact"),
ICAPXaction("ICAPOptXact", anInitiator, aService)
{
}
/* ICAPOptXactLauncher */
ICAPOptXactLauncher::ICAPOptXactLauncher(ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService):
+ AsyncJob("ICAPOptXactLauncher"),
ICAPLauncher("ICAPOptXactLauncher", anInitiator, aService)
{
}
CBDATA_CLASS_INIT(ICAPServiceRep);
-ICAPServiceRep::ICAPServiceRep(): method(ICAP::methodNone),
+ICAPServiceRep::ICAPServiceRep(): AsyncJob("ICAPServiceRep"), method(ICAP::methodNone),
point(ICAP::pointNone), port(-1), bypass(false),
theOptions(NULL), theOptionsFetcher(0), theLastUpdate(0),
theSessionFailures(0), isSuspended(0), notifying(false),
startGettingOptions();
}
+#if 0
static
void ICAPServiceRep_noteTimeToNotify(void *data)
{
Must(service);
service->noteTimeToNotify();
}
+#endif
void ICAPServiceRep::noteTimeToNotify()
{
while (!theClients.empty()) {
Client i = theClients.pop_back();
- us = i.service; // prevent callbacks from destroying us while we loop
-
- if (cbdataReferenceValid(i.data))
- (*i.callback)(i.data, us);
-
- cbdataReferenceDone(i.data);
+ ScheduleCallHere(i.callback);
+ i.callback = 0;
}
notifying = false;
}
-void ICAPServiceRep::callWhenReady(Callback *cb, void *data)
+void ICAPServiceRep::callWhenReady(AsyncCall::Pointer &cb)
{
- debugs(93,5, HERE << "ICAPService is asked to call " << data <<
+ Must(cb!=NULL);
+
+ debugs(93,5, HERE << "ICAPService is asked to call " << *cb <<
" when ready " << status());
- Must(cb);
Must(self != NULL);
Must(!broken()); // we do not wait for a broken service
Client i;
- i.service = self;
+ i.service = self; // TODO: is this really needed?
i.callback = cb;
- i.data = cbdataReference(data);
theClients.push_back(i);
if (theOptionsFetcher || notifying)
void ICAPServiceRep::scheduleNotification()
{
debugs(93,7, "ICAPService will notify " << theClients.size() << " clients");
- eventAdd("ICAPServiceRep::noteTimeToNotify", &ICAPServiceRep_noteTimeToNotify, this, 0, 0, true);
+ CallJobHere(93, 5, this, ICAPServiceRep::noteTimeToNotify);
}
bool ICAPServiceRep::needNewOptions() const
debugs(93,6, "ICAPService will get new options " << status());
theOptionsFetcher = initiateIcap(new ICAPOptXactLauncher(this, self));
+ Must(theOptionsFetcher);
// TODO: timeout in case ICAPOptXact never calls us back?
// Such a timeout should probably be a generic AsyncStart feature.
}
/*
- * $Id: ICAPServiceRep.h,v 1.11 2007/07/24 16:43:33 rousskov Exp $
+ * $Id: ICAPServiceRep.h,v 1.12 2008/02/12 23:12:45 rousskov Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
bool broken() const; // see comments above
bool up() const; // see comments above
- typedef void Callback(void *data, Pointer &service);
- void callWhenReady(Callback *cb, void *data);
+ void callWhenReady(AsyncCall::Pointer &cb);
// the methods below can only be called on an up() service
bool wantsUrl(const String &urlPath) const;
bool allows204() const;
void noteFailure(); // called by transactions to report service failure
+
+ //AsyncJob virtual methods
+ virtual bool doneAll() const { return ICAPInitiator::doneAll() && false;}
public:
String key;
struct Client
{
Pointer service; // one for each client to preserve service
- Callback *callback;
- void *data;
+ AsyncCall::Pointer callback;
};
typedef Vector<Client> Clients;
#include "squid.h"
#include "comm.h"
+#include "CommCalls.h"
#include "HttpMsg.h"
#include "ICAPXaction.h"
#include "ICAPConfig.h"
static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
-int ICAPXaction::TheLastId = 0;
//CBDATA_CLASS_INIT(ICAPXaction);
-/* comm module handlers (wrappers around corresponding ICAPXaction methods */
-
-// TODO: Teach comm module to call object methods directly
-
-static
-ICAPXaction &ICAPXaction_fromData(void *data)
-{
- ICAPXaction *x = static_cast<ICAPXaction*>(data);
- assert(x);
- return *x;
-}
-
-static
-void ICAPXaction_noteCommTimedout(int, void *data)
-{
- ICAPXaction_fromData(data).noteCommTimedout();
-}
-
-static
-void ICAPXaction_noteCommClosed(int, void *data)
-{
- ICAPXaction_fromData(data).noteCommClosed();
-}
-
-static
-void ICAPXaction_noteCommConnected(int, comm_err_t status, int xerrno, void *data)
-{
- ICAPXaction_fromData(data).noteCommConnected(status);
-}
-
-static
-void ICAPXaction_noteCommWrote(int, char *, size_t size, comm_err_t status, int xerrno, void *data)
-{
- ICAPXaction_fromData(data).noteCommWrote(status, size);
-}
-
-static
-void ICAPXaction_noteCommRead(int, char *, size_t size, comm_err_t status, int xerrno, void *data)
-{
- debugs(93,3,HERE << data << " read returned " << size);
- ICAPXaction_fromData(data).noteCommRead(status, size);
-}
-
ICAPXaction::ICAPXaction(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService):
+ AsyncJob(aTypeName),
ICAPInitiate(aTypeName, anInitiator, aService),
- id(++TheLastId),
connection(-1),
commBuf(NULL), commBufSize(0),
commEof(false),
connection = icapPconnPool->pop(s.host.buf(), s.port, NULL, client_addr, isRetriable);
if (connection >= 0) {
debugs(93,3, HERE << "reused pconn FD " << connection);
- connector = &ICAPXaction_noteCommConnected; // make doneAll() false
- eventAdd("ICAPXaction::reusedConnection",
- reusedConnection,
- this,
- 0.0,
- 0,
- true);
+
+ // fake the connect callback
+ // TODO: can we sync call ICAPXaction::noteCommConnected here instead?
+ typedef CommCbMemFunT<ICAPXaction, CommConnectCbParams> Dialer;
+ Dialer dialer(this, &ICAPXaction::noteCommConnected);
+ dialer.params.flag = COMM_OK;
+ // fake other parameters by copying from the existing connection
+ connector = asyncCall(93,3, "ICAPXaction::noteCommConnected", dialer);
+ ScheduleCallHere(connector);
return;
}
debugs(93,3, typeName << " opens connection to " << s.host.buf() << ":" << s.port);
// TODO: service bypass status may differ from that of a transaction
- commSetTimeout(connection, TheICAPConfig.connect_timeout(service().bypass),
- &ICAPXaction_noteCommTimedout, this);
+ typedef CommCbMemFunT<ICAPXaction, CommTimeoutCbParams> TimeoutDialer;
+ AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "ICAPXaction::noteCommTimedout",
+ TimeoutDialer(this,&ICAPXaction::noteCommTimedout));
- closer = &ICAPXaction_noteCommClosed;
- comm_add_close_handler(connection, closer, this);
+ commSetTimeout(connection, TheICAPConfig.connect_timeout(service().bypass), timeoutCall);
- connector = &ICAPXaction_noteCommConnected;
- commConnectStart(connection, s.host.buf(), s.port, connector, this);
+ typedef CommCbMemFunT<ICAPXaction, CommCloseCbParams> CloseDialer;
+ closer = asyncCall(93, 5, "ICAPXaction::noteCommClosed",
+ CloseDialer(this,&ICAPXaction::noteCommClosed));
+ comm_add_close_handler(connection, closer);
+
+ typedef CommCbMemFunT<ICAPXaction, CommConnectCbParams> ConnectDialer;
+ connector = asyncCall(93,3, "ICAPXaction::noteCommConnected",
+ ConnectDialer(this, &ICAPXaction::noteCommConnected));
+ commConnectStart(connection, s.host.buf(), s.port, connector);
}
/*
* This event handler is necessary to work around the no-rentry policy
* of ICAPXaction::callStart()
*/
+#if 0
void
ICAPXaction::reusedConnection(void *data)
{
ICAPXaction *x = (ICAPXaction*)data;
x->noteCommConnected(COMM_OK);
}
+#endif
void ICAPXaction::closeConnection()
{
if (connection >= 0) {
- if (closer) {
- comm_remove_close_handler(connection, closer, this);
+ if (closer != NULL) {
+ comm_remove_close_handler(connection, closer);
closer = NULL;
}
if (reuseConnection) {
IPAddress client_addr;
debugs(93,3, HERE << "pushing pconn" << status());
- commSetTimeout(connection, -1, NULL, NULL);
+ AsyncCall::Pointer call = NULL;
+ commSetTimeout(connection, -1, call);
icapPconnPool->push(connection, theService->host.buf(), theService->port, NULL, client_addr);
disableRetries();
} else {
}
// connection with the ICAP service established
-void ICAPXaction::noteCommConnected(comm_err_t commStatus)
+void ICAPXaction::noteCommConnected(const CommConnectCbParams &io)
{
- ICAPXaction_Enter(noteCommConnected);
-
- Must(connector);
+ Must(connector != NULL);
connector = NULL;
- if (commStatus != COMM_OK)
+ if (io.flag != COMM_OK)
dieOnConnectionFailure(); // throws
fd_table[connection].noteUse(icapPconnPool);
handleCommConnected();
-
- ICAPXaction_Exit();
}
void ICAPXaction::dieOnConnectionFailure() {
void ICAPXaction::scheduleWrite(MemBuf &buf)
{
// comm module will free the buffer
- writer = &ICAPXaction_noteCommWrote;
- comm_write_mbuf(connection, &buf, writer, this);
+ typedef CommCbMemFunT<ICAPXaction, CommIoCbParams> Dialer;
+ writer = asyncCall(93,3, "ICAPXaction::noteCommWrote",
+ Dialer(this, &ICAPXaction::noteCommWrote));
+
+ comm_write_mbuf(connection, &buf, writer);
updateTimeout();
}
-void ICAPXaction::noteCommWrote(comm_err_t commStatus, size_t size)
+void ICAPXaction::noteCommWrote(const CommIoCbParams &io)
{
- ICAPXaction_Enter(noteCommWrote);
-
- Must(writer);
+ Must(writer != NULL);
writer = NULL;
if (ignoreLastWrite) {
// a hack due to comm inability to cancel a pending write
ignoreLastWrite = false;
- debugs(93, 7, HERE << "ignoring last write; status: " << commStatus);
+ debugs(93, 7, HERE << "ignoring last write; status: " << io.flag);
} else {
- Must(commStatus == COMM_OK);
+ Must(io.flag == COMM_OK);
updateTimeout();
- handleCommWrote(size);
+ handleCommWrote(io.size);
}
-
- ICAPXaction_Exit();
}
// communication timeout with the ICAP service
-void ICAPXaction::noteCommTimedout()
+void ICAPXaction::noteCommTimedout(const CommTimeoutCbParams &io)
{
- ICAPXaction_Enter(noteCommTimedout);
-
handleCommTimedout();
-
- ICAPXaction_Exit();
}
void ICAPXaction::handleCommTimedout()
reuseConnection = false;
service().noteFailure();
- throw TexcHere(connector ?
+ throw TexcHere(connector != NULL ?
"timed out while connecting to the ICAP service" :
"timed out while talking to the ICAP service");
}
// unexpected connection close while talking to the ICAP service
-void ICAPXaction::noteCommClosed()
+void ICAPXaction::noteCommClosed(const CommCloseCbParams &io)
{
closer = NULL;
- ICAPXaction_Enter(noteCommClosed);
-
handleCommClosed();
-
- ICAPXaction_Exit();
}
void ICAPXaction::handleCommClosed()
}
void ICAPXaction::updateTimeout() {
- if (reader || writer) {
+ if (reader != NULL || writer != NULL) {
// restart the timeout before each I/O
// XXX: why does Config.Timeout lacks a write timeout?
// TODO: service bypass status may differ from that of a transaction
- commSetTimeout(connection, TheICAPConfig.io_timeout(service().bypass),
- &ICAPXaction_noteCommTimedout, this);
+ typedef CommCbMemFunT<ICAPXaction, CommTimeoutCbParams> TimeoutDialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ICAPXaction::noteCommTimedout",
+ TimeoutDialer(this,&ICAPXaction::noteCommTimedout));
+
+ commSetTimeout(connection, TheICAPConfig.io_timeout(service().bypass), call);
} else {
// clear timeout when there is no I/O
// Do we need a lifetime timeout?
- commSetTimeout(connection, -1, NULL, NULL);
+ AsyncCall::Pointer call = NULL;
+ commSetTimeout(connection, -1, call);
}
}
Must(!reader);
Must(readBuf.hasSpace());
- reader = &ICAPXaction_noteCommRead;
/*
* See comments in ICAPXaction.h about why we use commBuf
* here instead of reading directly into readBuf.buf.
*/
+ typedef CommCbMemFunT<ICAPXaction, CommIoCbParams> Dialer;
+ reader = asyncCall(93,3, "ICAPXaction::noteCommRead",
+ Dialer(this, &ICAPXaction::noteCommRead));
- comm_read(connection, commBuf, readBuf.spaceSize(), reader, this);
+ comm_read(connection, commBuf, readBuf.spaceSize(), reader);
updateTimeout();
}
// comm module read a portion of the ICAP response for us
-void ICAPXaction::noteCommRead(comm_err_t commStatus, size_t sz)
+void ICAPXaction::noteCommRead(const CommIoCbParams &io)
{
- ICAPXaction_Enter(noteCommRead);
-
- Must(reader);
+ Must(reader != NULL);
reader = NULL;
- Must(commStatus == COMM_OK);
- Must(sz >= 0);
+ Must(io.flag == COMM_OK);
+ Must(io.size >= 0);
updateTimeout();
- debugs(93, 3, HERE << "read " << sz << " bytes");
+ debugs(93, 3, HERE << "read " << io.size << " bytes");
/*
* See comments in ICAPXaction.h about why we use commBuf
* here instead of reading directly into readBuf.buf.
*/
- if (sz > 0) {
- readBuf.append(commBuf, sz);
+ if (io.size > 0) {
+ readBuf.append(commBuf, io.size);
disableRetries(); // because pconn did not fail
} else {
reuseConnection = false;
commEof = true;
}
- handleCommRead(sz);
-
- ICAPXaction_Exit();
+ handleCommRead(io.size);
}
void ICAPXaction::cancelRead()
{
- if (reader) {
- // check callback presence because comm module removes
- // fdc_table[].read.callback after the actual I/O but
- // before we get the callback via a queued event.
- // These checks try to mimic the comm_read_cancel() assertions.
-
- if (comm_has_pending_read(connection) &&
- !comm_has_pending_read_callback(connection)) {
- comm_read_cancel(connection, reader, this);
- reader = NULL;
- }
+ if (reader != NULL) {
+ comm_read_cancel(connection, reader);
+ reader = NULL;
}
}
// initiator aborted
void ICAPXaction::noteInitiatorAborted()
{
- ICAPXaction_Enter(noteInitiatorAborted);
if (theInitiator) {
clearInitiator();
mustStop("initiator aborted");
}
- ICAPXaction_Exit();
}
// This 'last chance' method is called before a 'done' transaction is deleted.
if (connection >= 0) {
buf.Printf("FD %d", connection);
- if (writer)
+ if (writer != NULL)
buf.append("w", 1);
- if (reader)
+ if (reader != NULL)
buf.append("r", 1);
buf.append(";", 1);
/*
- * $Id: ICAPXaction.h,v 1.12 2007/06/19 21:08:33 rousskov Exp $
+ * $Id: ICAPXaction.h,v 1.13 2008/02/12 23:12:46 rousskov Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
#define SQUID_ICAPXACTION_H
#include "comm.h"
+#include "CommCalls.h"
#include "MemBuf.h"
#include "ICAPServiceRep.h"
#include "ICAPInitiate.h"
class HttpMsg;
+class CommConnectCbParams;
/*
* The ICAP Xaction implements common tasks for ICAP OPTIONS, REQMOD, and
void disableRetries();
// comm handler wrappers, treat as private
- void noteCommConnected(comm_err_t status);
- void noteCommWrote(comm_err_t status, size_t sz);
- void noteCommRead(comm_err_t status, size_t sz);
- void noteCommTimedout();
- void noteCommClosed();
+ void noteCommConnected(const CommConnectCbParams &io);
+ void noteCommWrote(const CommIoCbParams &io);
+ void noteCommRead(const CommIoCbParams &io);
+ void noteCommTimedout(const CommTimeoutCbParams &io);
+ void noteCommClosed(const CommCloseCbParams &io);
protected:
virtual void start();
virtual void callEnd();
protected:
- const int id; // transaction ID for debugging, unique across ICAP xactions
-
int connection; // FD of the ICAP server connection
/*
const char *stopReason;
// active (pending) comm callbacks for the ICAP server connection
- CNCB *connector;
- IOCB *reader;
- IOCB *writer;
- PF *closer;
+ AsyncCall::Pointer connector;
+ AsyncCall::Pointer reader;
+ AsyncCall::Pointer writer;
+ AsyncCall::Pointer closer;
private:
- static int TheLastId;
-
- static void reusedConnection(void *data);
-
//CBDATA_CLASS2(ICAPXaction);
};
-// call guards for all "asynchronous" note*() methods
-// If we move ICAPXaction_* macros to core, they can use these generic names:
-#define ICAPXaction_Enter(method) AsyncCallEnter(method)
-#define ICAPXaction_Exit() AsyncCallExit()
-
#endif /* SQUID_ICAPXACTION_H */