From: rousskov <> Date: Wed, 13 Feb 2008 06:12:45 +0000 (+0000) Subject: Merging async-call branch changes to HEAD: X-Git-Tag: BASIC_TPROXY4~108 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=bd7f2edef6ca0e6da0b065091eee53f8d758ba7e;p=thirdparty%2Fsquid.git Merging async-call branch changes to HEAD: Async-call work replaces event-based asynchronous calls with stand-alone implementation. The common async call API allows Squid core do call, debug, and troubleshoot all callback handlers in a uniform way. An async "job" API is introduced to manage independent logical threads or work such as protocol transaction handlers on client, server, and ICAP sides. These jobs should communicate with each other using async calls to minimize dependencies and avoid reentrant callback loops. These changes will eventually improve overall code quality, debugging quality, and Squid robustness. Most of the changes in the ICAP/ICAP* files are related to removing handler wrappers and using CommCalls. AsyncJob is now a [virtual] base class for ICAPInitiator. All ICAP initiators and ICAP transactions are now "jobs". --- diff --git a/src/ICAP/ICAPConfig.cc b/src/ICAP/ICAPConfig.cc index 644e85f783..5aa2d6e2b2 100644 --- a/src/ICAP/ICAPConfig.cc +++ b/src/ICAP/ICAPConfig.cc @@ -1,6 +1,6 @@ /* - * $Id: ICAPConfig.cc,v 1.20 2007/09/27 15:31:15 rousskov Exp $ + * $Id: ICAPConfig.cc,v 1.21 2008/02/12 23:12:45 rousskov Exp $ * * SQUID Web Proxy Cache http://www.squid-cache.org/ * ---------------------------------------------------------- @@ -116,7 +116,7 @@ ICAPAccessCheck::ICAPAccessCheck(ICAP::Method aMethod, HttpRequest *aReq, HttpReply *aRep, ICAPAccessCheckCallback *aCallback, - void *aCallbackData) + void *aCallbackData): AsyncJob("ICAPAccessCheck"), done(FALSE) { method = aMethod; point = aPoint; @@ -223,14 +223,10 @@ ICAPAccessCheck::ICAPAccessCheckCallbackWrapper(int answer, void *data) /* * We use an event here to break deep function call sequences */ - eventAdd("ICAPAccessCheckCallbackEvent", - ICAPAccessCheckCallbackEvent, - ac, - 0.0, - 0, - true); + CallJobHere(93, 5, ac, ICAPAccessCheck::do_callback); } +#if 0 void ICAPAccessCheck::ICAPAccessCheckCallbackEvent(void *data) { @@ -239,6 +235,7 @@ ICAPAccessCheck::ICAPAccessCheckCallbackEvent(void *data) ac->do_callback(); delete ac; } +#endif void ICAPAccessCheck::do_callback() @@ -267,6 +264,7 @@ ICAPAccessCheck::do_callback() } callback(service, validated_cbdata); + done = TRUE; } ICAPServiceRep::Pointer diff --git a/src/ICAP/ICAPConfig.h b/src/ICAP/ICAPConfig.h index eb62c5e7a7..518f227c37 100644 --- a/src/ICAP/ICAPConfig.h +++ b/src/ICAP/ICAPConfig.h @@ -1,6 +1,6 @@ /* - * $Id: ICAPConfig.h,v 1.16 2007/06/28 15:28:59 rousskov Exp $ + * $Id: ICAPConfig.h,v 1.17 2008/02/12 23:12:45 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -37,6 +37,7 @@ #define SQUID_ICAPCONFIG_H #include "event.h" +#include "AsyncCall.h" #include "ICAPServiceRep.h" class acl_access; @@ -57,7 +58,7 @@ public: int prepare(); }; -class ICAPAccessCheck +class ICAPAccessCheck: public virtual AsyncJob { public: @@ -77,12 +78,17 @@ private: String matchedClass; void do_callback(); ICAPServiceRep::Pointer findBestService(ICAPClass *c, bool preferUp); + bool done; public: void check(); void checkCandidates(); static void ICAPAccessCheckCallbackWrapper(int, void*); +#if 0 static EVH ICAPAccessCheckCallbackEvent; +#endif +//AsyncJob virtual methods + virtual bool doneAll() const { return AsyncJob::doneAll() && done;} private: CBDATA_CLASS2(ICAPAccessCheck); diff --git a/src/ICAP/ICAPInitiate.cc b/src/ICAP/ICAPInitiate.cc index 1c8ce334bc..70eb087eb6 100644 --- a/src/ICAP/ICAPInitiate.cc +++ b/src/ICAP/ICAPInitiate.cc @@ -7,57 +7,18 @@ #include "ICAPInitiator.h" #include "ICAPInitiate.h" -/* The call objects below are not cbdata-protected or refcounted because - * nobody holds a pointer to them except for the event queue. - * - * The calls do check the Initiator pointer to see if that is still valid. - * - * TODO: convert this to a generic AsyncCall1 class - * TODO: mempool kids of this class. - */ - -/* Event data and callback wrapper to call noteIcapAnswer with - * the answer message as a parameter. - */ -class ICAPAnswerCall { -public: - // use this function to make an asynchronous call - static void Schedule(const ICAPInitiatorHolder &anInitiator, HttpMsg *aMessage); - - static void Wrapper(void *data); - -protected: - ICAPAnswerCall(const ICAPInitiatorHolder &anInitiator, HttpMsg *aMessage); - ~ICAPAnswerCall(); - - void schedule(); - void call(); - - ICAPInitiatorHolder theInitiator; - HttpMsg *theMessage; -}; - - -/* Event data and callback wrapper to call noteIcapQueryAbort with - * the termination status as a parameter. - * - * XXX: This class is a clone of ICAPAnswerCall. - */ -class ICAPQueryAbortCall { +// ICAPInitiator::noteIcapAnswer Dialer locks/unlocks the message in transit +// TODO: replace HTTPMSGLOCK with general RefCounting and delete this class +class IcapAnswerDialer: public UnaryMemFunT +{ public: - // use this function to make an asynchronous call - static void Schedule(const ICAPInitiatorHolder &anInitiator, bool beFinal); - - static void Wrapper(void *data); - -protected: - ICAPQueryAbortCall(const ICAPInitiatorHolder &anInitiator, bool beFinal); - - void schedule(); - void call(); + typedef UnaryMemFunT Parent; - ICAPInitiatorHolder theInitiator; - bool isFinal; + IcapAnswerDialer(ICAPInitiator *obj, Parent::Method meth, HttpMsg *msg): + Parent(obj, meth, msg) { HTTPMSGLOCK(arg1); } + IcapAnswerDialer(const IcapAnswerDialer &d): + Parent(d) { HTTPMSGLOCK(arg1); } + virtual ~IcapAnswerDialer() { HTTPMSGUNLOCK(arg1); } }; @@ -97,13 +58,16 @@ void ICAPInitiate::clearInitiator() void ICAPInitiate::sendAnswer(HttpMsg *msg) { - ICAPAnswerCall::Schedule(theInitiator, msg); + assert(msg); + CallJob(93, 5, __FILE__, __LINE__, "ICAPInitiator::noteIcapAnswer", + IcapAnswerDialer(theInitiator.ptr, &ICAPInitiator::noteIcapAnswer, msg)); clearInitiator(); } + void ICAPInitiate::tellQueryAborted(bool final) { - ICAPQueryAbortCall::Schedule(theInitiator, final); + CallJobHere1(93, 5, theInitiator.ptr, ICAPInitiator::noteIcapQueryAbort, final); clearInitiator(); } @@ -156,122 +120,3 @@ ICAPInitiatorHolder &ICAPInitiatorHolder::operator =(const ICAPInitiatorHolder & assert(false); return *this; } - -/* ICAPAnswerCall */ - -ICAPAnswerCall::ICAPAnswerCall(const ICAPInitiatorHolder &anInitiator, HttpMsg *aMessage): - theInitiator(anInitiator), theMessage(0) -{ - if (theInitiator) { - assert(aMessage); - theMessage = HTTPMSGLOCK(aMessage); - } -} - -void ICAPAnswerCall::schedule() -{ - if (theInitiator) { - debugs(93,3, __FILE__ << "(" << __LINE__ << ") will call " << - theInitiator << "->ICAPInitiator::noteIcapAnswer(" << - theMessage << ")"); - eventAdd("ICAPInitiator::noteIcapAnswer", - &ICAPAnswerCall::Wrapper, this, 0.0, 0, false); - } else { - debugs(93,3, __FILE__ << "(" << __LINE__ << ") will not call " << - theInitiator << "->ICAPInitiator::noteIcapAnswer(" << - theMessage << ")"); - } -} - -ICAPAnswerCall::~ICAPAnswerCall() -{ - if (theInitiator) - HTTPMSGUNLOCK(theMessage); -} - -void ICAPAnswerCall::Wrapper(void *data) -{ - assert(data); - ICAPAnswerCall *c = static_cast(data); - c->call(); - delete c; -} - -void ICAPAnswerCall::call() { - assert(theInitiator); - if (cbdataReferenceValid(theInitiator.cbdata)) { - debugs(93, 3, "entering " << - theInitiator << "->ICAPInitiator::noteIcapAnswer(" << - theMessage << ")"); - theInitiator.ptr->noteIcapAnswer(theMessage); - debugs(93, 3, "exiting " << - theInitiator << "->ICAPInitiator::noteIcapAnswer(" << - theMessage << ")"); - } else { - debugs(93, 3, "ignoring " << - theInitiator << "->ICAPInitiator::noteIcapAnswer(" << - theMessage << ")"); - } -} - -void ICAPAnswerCall::Schedule(const ICAPInitiatorHolder &anInitiator, HttpMsg *aMessage) -{ - ICAPAnswerCall *call = new ICAPAnswerCall(anInitiator, aMessage); - call->schedule(); - // The call object is deleted in ICAPAnswerCall::Wrapper -} - - -/* ICAPQueryAbortCall */ - -ICAPQueryAbortCall::ICAPQueryAbortCall(const ICAPInitiatorHolder &anInitiator, bool beFinal): - theInitiator(anInitiator), isFinal(beFinal) -{ -} - -void ICAPQueryAbortCall::schedule() -{ - if (theInitiator) { - debugs(93,3, __FILE__ << "(" << __LINE__ << ") will call " << - theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" << - isFinal << ")"); - eventAdd("ICAPInitiator::noteIcapQueryAbort", - &ICAPQueryAbortCall::Wrapper, this, 0.0, 0, false); - } else { - debugs(93,3, __FILE__ << "(" << __LINE__ << ") will not call " << - theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" << - isFinal << ")"); - } -} - -void ICAPQueryAbortCall::Wrapper(void *data) -{ - assert(data); - ICAPQueryAbortCall *c = static_cast(data); - c->call(); - delete c; -} - -void ICAPQueryAbortCall::call() { - assert(theInitiator); - if (cbdataReferenceValid(theInitiator.cbdata)) { - debugs(93, 3, "entering " << - theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" << - isFinal << ")"); - theInitiator.ptr->noteIcapQueryAbort(isFinal); - debugs(93, 3, "exiting " << - theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" << - isFinal << ")"); - } else { - debugs(93, 3, "ignoring " << - theInitiator << "->ICAPInitiator::noteIcapQueryAbort(" << - isFinal << ")"); - } -} - -void ICAPQueryAbortCall::Schedule(const ICAPInitiatorHolder &anInitiator, bool beFinal) -{ - ICAPQueryAbortCall *call = new ICAPQueryAbortCall(anInitiator, beFinal); - call->schedule(); - // The call object is deleted in ICAPQueryAbortCall::Wrapper -} diff --git a/src/ICAP/ICAPInitiate.h b/src/ICAP/ICAPInitiate.h index 586c470765..1d3a50b754 100644 --- a/src/ICAP/ICAPInitiate.h +++ b/src/ICAP/ICAPInitiate.h @@ -1,6 +1,6 @@ /* - * $Id: ICAPInitiate.h,v 1.1 2007/05/08 16:32:11 rousskov Exp $ + * $Id: ICAPInitiate.h,v 1.2 2008/02/12 23:12:45 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -37,13 +37,14 @@ #include "comm.h" #include "MemBuf.h" #include "ICAPServiceRep.h" -#include "AsyncJob.h" +#include "AsyncCall.h" class HttpMsg; class ICAPInitiator; /* Initiator holder associtates an initiator with its cbdata. It is used as * a temporary hack to make cbdata work with multiple inheritance */ +// TODO: since ICAPInitiator is now an AsyncJob, we do not need this class. class ICAPInitiatorHolder { public: ICAPInitiatorHolder(ICAPInitiator *anInitiator); @@ -77,7 +78,7 @@ private: * * This class could have been named ICAPInitiatee. */ -class ICAPInitiate: public AsyncJob +class ICAPInitiate: virtual public AsyncJob { public: @@ -86,7 +87,6 @@ public: // communication with the initiator virtual void noteInitiatorAborted() = 0; - AsyncCallWrapper(93,3, ICAPInitiate, noteInitiatorAborted) protected: ICAPServiceRep &service(); diff --git a/src/ICAP/ICAPInitiator.cc b/src/ICAP/ICAPInitiator.cc index 401fa2107f..d4a4ad47e9 100644 --- a/src/ICAP/ICAPInitiator.cc +++ b/src/ICAP/ICAPInitiator.cc @@ -7,8 +7,9 @@ #include "ICAPInitiator.h" ICAPInitiate *ICAPInitiator::initiateIcap(ICAPInitiate *x) { - x = cbdataReference(x); - return dynamic_cast(ICAPInitiate::AsyncStart(x)); + if ((x = dynamic_cast(ICAPInitiate::AsyncStart(x)))) + x = cbdataReference(x); + return x; } void ICAPInitiator::clearIcap(ICAPInitiate *&x) { @@ -19,7 +20,7 @@ void ICAPInitiator::clearIcap(ICAPInitiate *&x) { void ICAPInitiator::announceInitiatorAbort(ICAPInitiate *&x) { if (x) { - AsyncCall(93,5, x, ICAPInitiate::noteInitiatorAborted); + CallJobHere(93, 5, x, ICAPInitiate::noteInitiatorAborted); clearIcap(x); } } diff --git a/src/ICAP/ICAPInitiator.h b/src/ICAP/ICAPInitiator.h index 819d3276ee..7dacbcb75b 100644 --- a/src/ICAP/ICAPInitiator.h +++ b/src/ICAP/ICAPInitiator.h @@ -1,6 +1,6 @@ /* - * $Id: ICAPInitiator.h,v 1.2 2007/05/08 16:32:11 rousskov Exp $ + * $Id: ICAPInitiator.h,v 1.3 2008/02/12 23:12:45 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -43,12 +43,15 @@ * or aborting an ICAP transaction. */ +#include "AsyncJob.h" + class HttpMsg; class ICAPInitiate; -class ICAPInitiator +class ICAPInitiator: virtual public AsyncJob { public: + ICAPInitiator():AsyncJob("ICAPInitiator"){} virtual ~ICAPInitiator() {} // called when ICAP response headers are successfully interpreted @@ -58,9 +61,6 @@ public: // the final parameter is set to disable bypass or retries virtual void noteIcapQueryAbort(bool final) = 0; - // a temporary cbdata-for-multiple inheritance hack, see ICAPInitiator.cc - virtual void *toCbdata() { return this; } - protected: ICAPInitiate *initiateIcap(ICAPInitiate *x); // locks and returns x diff --git a/src/ICAP/ICAPLauncher.cc b/src/ICAP/ICAPLauncher.cc index b823759eef..de11975b09 100644 --- a/src/ICAP/ICAPLauncher.cc +++ b/src/ICAP/ICAPLauncher.cc @@ -9,7 +9,7 @@ #include "ICAPXaction.h" -ICAPLauncher::ICAPLauncher(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService): +ICAPLauncher::ICAPLauncher(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService):AsyncJob(aTypeName), ICAPInitiate(aTypeName, anInitiator, aService), theXaction(0), theLaunches(0) { @@ -42,30 +42,23 @@ void ICAPLauncher::launchXaction(bool final) void ICAPLauncher::noteIcapAnswer(HttpMsg *message) { - AsyncCallEnter(noteIcapAnswer); - sendAnswer(message); clearIcap(theXaction); Must(done()); - - AsyncCallExit(); + debugs(93,3, HERE << "ICAPLauncher::noteIcapAnswer exiting "); } void ICAPLauncher::noteInitiatorAborted() { - AsyncCallEnter(noteInitiatorAborted); announceInitiatorAbort(theXaction); // propogate to the transaction clearInitiator(); Must(done()); // should be nothing else to do - AsyncCallExit(); } void ICAPLauncher::noteIcapQueryAbort(bool final) { - AsyncCallEnter(noteQueryAbort); - clearIcap(theXaction); // TODO: add more checks from FwdState::checkRetry()? @@ -77,7 +70,6 @@ void ICAPLauncher::noteIcapQueryAbort(bool final) Must(done()); // swanSong will notify the initiator } - AsyncCallExit(); } bool ICAPLauncher::doneAll() const { diff --git a/src/ICAP/ICAPLauncher.h b/src/ICAP/ICAPLauncher.h index 8141287f46..9d5d22d1b6 100644 --- a/src/ICAP/ICAPLauncher.h +++ b/src/ICAP/ICAPLauncher.h @@ -1,6 +1,6 @@ /* - * $Id: ICAPLauncher.h,v 1.1 2007/05/08 16:32:11 rousskov Exp $ + * $Id: ICAPLauncher.h,v 1.2 2008/02/12 23:12:45 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -75,9 +75,6 @@ public: virtual void noteIcapAnswer(HttpMsg *message); virtual void noteIcapQueryAbort(bool final); - // a temporary cbdata-for-multiple inheritance hack, see ICAPInitiator.cc - virtual void *toCbdata() { return this; } - protected: // ICAPInitiate API implementation virtual void start(); diff --git a/src/ICAP/ICAPModXact.cc b/src/ICAP/ICAPModXact.cc index 807505d423..be6fa95553 100644 --- a/src/ICAP/ICAPModXact.cc +++ b/src/ICAP/ICAPModXact.cc @@ -39,6 +39,7 @@ ICAPModXact::State::State() ICAPModXact::ICAPModXact(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &aService): + AsyncJob("ICAPModXact"), ICAPXaction("ICAPModXact", anInitiator, aService), icapReply(NULL), virginConsumed(0), @@ -83,14 +84,7 @@ void ICAPModXact::start() // XXX: If commConnectStart in startWriting fails, we may get here //_after_ the object got destroyed. Somebody please fix commConnectStart! // TODO: Does re-entrance protection in callStart() solve the above? -} - -static -void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &) -{ - ICAPModXact *x = static_cast(data); - assert(x); - x->noteServiceReady(); + // TODO: Check that comm using AsyncCalls solves this problem. } void ICAPModXact::waitForService() @@ -98,13 +92,13 @@ void ICAPModXact::waitForService() Must(!state.serviceWaiting); debugs(93, 7, "ICAPModXact will wait for the ICAP service" << status()); state.serviceWaiting = true; - service().callWhenReady(&ICAPModXact_noteServiceReady, this); + AsyncCall::Pointer call = asyncCall(93,5, "ICAPModXact::noteServiceReady", + MemFun(this, &ICAPModXact::noteServiceReady)); + service().callWhenReady(call); } void ICAPModXact::noteServiceReady() { - ICAPXaction_Enter(noteServiceReady); - Must(state.serviceWaiting); state.serviceWaiting = false; @@ -114,8 +108,6 @@ void ICAPModXact::noteServiceReady() disableRetries(); throw TexcHere("ICAP service is unusable"); } - - ICAPXaction_Exit(); } void ICAPModXact::startWriting() @@ -181,7 +173,7 @@ void ICAPModXact::writeMore() { debugs(93, 5, HERE << "checking whether to write more" << status()); - if (writer) // already writing something + if (writer != NULL) // already writing something return; switch (state.writing) { @@ -413,7 +405,7 @@ void ICAPModXact::stopWriting(bool nicely) if (state.writing == State::writingReallyDone) return; - if (writer) { + if (writer != NULL) { if (nicely) { debugs(93, 7, HERE << "will wait for the last write" << status()); state.writing = State::writingAlmostDone; // may already be set @@ -470,7 +462,7 @@ void ICAPModXact::startReading() void ICAPModXact::readMore() { - if (reader || doneReading()) { + if (reader != NULL || doneReading()) { debugs(93,3,HERE << "returning from readMore because reader or doneReading()"); return; } @@ -982,23 +974,17 @@ void ICAPModXact::stopParsing() } // HTTP side added virgin body data -void ICAPModXact::noteMoreBodyDataAvailable(BodyPipe &) +void ICAPModXact::noteMoreBodyDataAvailable(BodyPipe::Pointer) { - ICAPXaction_Enter(noteMoreBodyDataAvailable); - writeMore(); if (state.sending == State::sendingVirgin) echoMore(); - - ICAPXaction_Exit(); } // HTTP side sent us all virgin info -void ICAPModXact::noteBodyProductionEnded(BodyPipe &) +void ICAPModXact::noteBodyProductionEnded(BodyPipe::Pointer) { - ICAPXaction_Enter(noteBodyProductionEnded); - Must(virgin.body_pipe->productionEnded()); // push writer and sender in case we were waiting for the last-chunk @@ -1006,16 +992,12 @@ void ICAPModXact::noteBodyProductionEnded(BodyPipe &) if (state.sending == State::sendingVirgin) echoMore(); - - ICAPXaction_Exit(); } // body producer aborted, but the initiator may still want to know // the answer, even though the HTTP message has been truncated -void ICAPModXact::noteBodyProducerAborted(BodyPipe &) +void ICAPModXact::noteBodyProducerAborted(BodyPipe::Pointer) { - ICAPXaction_Enter(noteBodyProducerAborted); - Must(virgin.body_pipe->productionEnded()); // push writer and sender in case we were waiting for the last-chunk @@ -1023,34 +1005,24 @@ void ICAPModXact::noteBodyProducerAborted(BodyPipe &) if (state.sending == State::sendingVirgin) echoMore(); - - ICAPXaction_Exit(); } // adapted body consumer wants more adapted data and // possibly freed some buffer space -void ICAPModXact::noteMoreBodySpaceAvailable(BodyPipe &) +void ICAPModXact::noteMoreBodySpaceAvailable(BodyPipe::Pointer) { - ICAPXaction_Enter(noteMoreBodySpaceAvailable); - if (state.sending == State::sendingVirgin) echoMore(); else if (state.sending == State::sendingAdapted) parseMore(); else Must(state.sending == State::sendingUndecided); - - ICAPXaction_Exit(); } // adapted body consumer aborted -void ICAPModXact::noteBodyConsumerAborted(BodyPipe &) +void ICAPModXact::noteBodyConsumerAborted(BodyPipe::Pointer) { - ICAPXaction_Enter(noteBodyConsumerAborted); - mustStop("adapted body consumer aborted"); - - ICAPXaction_Exit(); } // internal cleanup @@ -1569,6 +1541,7 @@ bool ICAPModXact::fillVirginHttpHeader(MemBuf &mb) const /* ICAPModXactLauncher */ ICAPModXactLauncher::ICAPModXactLauncher(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &aService): + AsyncJob("ICAPModXactLauncher"), ICAPLauncher("ICAPModXactLauncher", anInitiator, aService) { virgin.setHeader(virginHeader); diff --git a/src/ICAP/ICAPModXact.h b/src/ICAP/ICAPModXact.h index 4127fd2f39..ebb69e1b3a 100644 --- a/src/ICAP/ICAPModXact.h +++ b/src/ICAP/ICAPModXact.h @@ -1,6 +1,6 @@ /* - * $Id: ICAPModXact.h,v 1.10 2007/08/13 17:20:53 hno Exp $ + * $Id: ICAPModXact.h,v 1.11 2008/02/12 23:12:45 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -136,13 +136,13 @@ public: ICAPModXact(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &s); // BodyProducer methods - virtual void noteMoreBodySpaceAvailable(BodyPipe &); - virtual void noteBodyConsumerAborted(BodyPipe &); + virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); + virtual void noteBodyConsumerAborted(BodyPipe::Pointer); // BodyConsumer methods - virtual void noteMoreBodyDataAvailable(BodyPipe &); - virtual void noteBodyProductionEnded(BodyPipe &); - virtual void noteBodyProducerAborted(BodyPipe &); + virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer); + virtual void noteBodyProductionEnded(BodyPipe::Pointer); + virtual void noteBodyProducerAborted(BodyPipe::Pointer); // comm handlers virtual void handleCommConnected(); diff --git a/src/ICAP/ICAPOptXact.cc b/src/ICAP/ICAPOptXact.cc index da0dc719ba..644e06f5d8 100644 --- a/src/ICAP/ICAPOptXact.cc +++ b/src/ICAP/ICAPOptXact.cc @@ -15,6 +15,7 @@ CBDATA_CLASS_INIT(ICAPOptXactLauncher); ICAPOptXact::ICAPOptXact(ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService): + AsyncJob("ICAPOptXact"), ICAPXaction("ICAPOptXact", anInitiator, aService) { } @@ -88,6 +89,7 @@ HttpMsg *ICAPOptXact::parseResponse() /* ICAPOptXactLauncher */ ICAPOptXactLauncher::ICAPOptXactLauncher(ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService): + AsyncJob("ICAPOptXactLauncher"), ICAPLauncher("ICAPOptXactLauncher", anInitiator, aService) { } diff --git a/src/ICAP/ICAPServiceRep.cc b/src/ICAP/ICAPServiceRep.cc index a25acf7206..224abea48a 100644 --- a/src/ICAP/ICAPServiceRep.cc +++ b/src/ICAP/ICAPServiceRep.cc @@ -14,7 +14,7 @@ CBDATA_CLASS_INIT(ICAPServiceRep); -ICAPServiceRep::ICAPServiceRep(): method(ICAP::methodNone), +ICAPServiceRep::ICAPServiceRep(): AsyncJob("ICAPServiceRep"), method(ICAP::methodNone), point(ICAP::pointNone), port(-1), bypass(false), theOptions(NULL), theOptionsFetcher(0), theLastUpdate(0), theSessionFailures(0), isSuspended(0), notifying(false), @@ -270,6 +270,7 @@ void ICAPServiceRep::noteTimeToUpdate() startGettingOptions(); } +#if 0 static void ICAPServiceRep_noteTimeToNotify(void *data) { @@ -277,6 +278,7 @@ void ICAPServiceRep_noteTimeToNotify(void *data) Must(service); service->noteTimeToNotify(); } +#endif void ICAPServiceRep::noteTimeToNotify() { @@ -291,30 +293,26 @@ void ICAPServiceRep::noteTimeToNotify() while (!theClients.empty()) { Client i = theClients.pop_back(); - us = i.service; // prevent callbacks from destroying us while we loop - - if (cbdataReferenceValid(i.data)) - (*i.callback)(i.data, us); - - cbdataReferenceDone(i.data); + ScheduleCallHere(i.callback); + i.callback = 0; } notifying = false; } -void ICAPServiceRep::callWhenReady(Callback *cb, void *data) +void ICAPServiceRep::callWhenReady(AsyncCall::Pointer &cb) { - debugs(93,5, HERE << "ICAPService is asked to call " << data << + Must(cb!=NULL); + + debugs(93,5, HERE << "ICAPService is asked to call " << *cb << " when ready " << status()); - Must(cb); Must(self != NULL); Must(!broken()); // we do not wait for a broken service Client i; - i.service = self; + i.service = self; // TODO: is this really needed? i.callback = cb; - i.data = cbdataReference(data); theClients.push_back(i); if (theOptionsFetcher || notifying) @@ -329,7 +327,7 @@ void ICAPServiceRep::callWhenReady(Callback *cb, void *data) void ICAPServiceRep::scheduleNotification() { debugs(93,7, "ICAPService will notify " << theClients.size() << " clients"); - eventAdd("ICAPServiceRep::noteTimeToNotify", &ICAPServiceRep_noteTimeToNotify, this, 0, 0, true); + CallJobHere(93, 5, this, ICAPServiceRep::noteTimeToNotify); } bool ICAPServiceRep::needNewOptions() const @@ -466,6 +464,7 @@ void ICAPServiceRep::startGettingOptions() debugs(93,6, "ICAPService will get new options " << status()); theOptionsFetcher = initiateIcap(new ICAPOptXactLauncher(this, self)); + Must(theOptionsFetcher); // TODO: timeout in case ICAPOptXact never calls us back? // Such a timeout should probably be a generic AsyncStart feature. } diff --git a/src/ICAP/ICAPServiceRep.h b/src/ICAP/ICAPServiceRep.h index a71888e7d5..f90cc7cf52 100644 --- a/src/ICAP/ICAPServiceRep.h +++ b/src/ICAP/ICAPServiceRep.h @@ -1,6 +1,6 @@ /* - * $Id: ICAPServiceRep.h,v 1.11 2007/07/24 16:43:33 rousskov Exp $ + * $Id: ICAPServiceRep.h,v 1.12 2008/02/12 23:12:45 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -90,8 +90,7 @@ public: bool broken() const; // see comments above bool up() const; // see comments above - typedef void Callback(void *data, Pointer &service); - void callWhenReady(Callback *cb, void *data); + void callWhenReady(AsyncCall::Pointer &cb); // the methods below can only be called on an up() service bool wantsUrl(const String &urlPath) const; @@ -99,6 +98,9 @@ public: bool allows204() const; void noteFailure(); // called by transactions to report service failure + + //AsyncJob virtual methods + virtual bool doneAll() const { return ICAPInitiator::doneAll() && false;} public: String key; @@ -128,8 +130,7 @@ private: struct Client { Pointer service; // one for each client to preserve service - Callback *callback; - void *data; + AsyncCall::Pointer callback; }; typedef Vector Clients; diff --git a/src/ICAP/ICAPXaction.cc b/src/ICAP/ICAPXaction.cc index 9fab451726..d71114e8c9 100644 --- a/src/ICAP/ICAPXaction.cc +++ b/src/ICAP/ICAPXaction.cc @@ -4,6 +4,7 @@ #include "squid.h" #include "comm.h" +#include "CommCalls.h" #include "HttpMsg.h" #include "ICAPXaction.h" #include "ICAPConfig.h" @@ -13,56 +14,12 @@ static PconnPool *icapPconnPool = new PconnPool("ICAP Servers"); -int ICAPXaction::TheLastId = 0; //CBDATA_CLASS_INIT(ICAPXaction); -/* comm module handlers (wrappers around corresponding ICAPXaction methods */ - -// TODO: Teach comm module to call object methods directly - -static -ICAPXaction &ICAPXaction_fromData(void *data) -{ - ICAPXaction *x = static_cast(data); - assert(x); - return *x; -} - -static -void ICAPXaction_noteCommTimedout(int, void *data) -{ - ICAPXaction_fromData(data).noteCommTimedout(); -} - -static -void ICAPXaction_noteCommClosed(int, void *data) -{ - ICAPXaction_fromData(data).noteCommClosed(); -} - -static -void ICAPXaction_noteCommConnected(int, comm_err_t status, int xerrno, void *data) -{ - ICAPXaction_fromData(data).noteCommConnected(status); -} - -static -void ICAPXaction_noteCommWrote(int, char *, size_t size, comm_err_t status, int xerrno, void *data) -{ - ICAPXaction_fromData(data).noteCommWrote(status, size); -} - -static -void ICAPXaction_noteCommRead(int, char *, size_t size, comm_err_t status, int xerrno, void *data) -{ - debugs(93,3,HERE << data << " read returned " << size); - ICAPXaction_fromData(data).noteCommRead(status, size); -} - ICAPXaction::ICAPXaction(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService): + AsyncJob(aTypeName), ICAPInitiate(aTypeName, anInitiator, aService), - id(++TheLastId), connection(-1), commBuf(NULL), commBufSize(0), commEof(false), @@ -113,13 +70,15 @@ void ICAPXaction::openConnection() connection = icapPconnPool->pop(s.host.buf(), s.port, NULL, client_addr, isRetriable); if (connection >= 0) { debugs(93,3, HERE << "reused pconn FD " << connection); - connector = &ICAPXaction_noteCommConnected; // make doneAll() false - eventAdd("ICAPXaction::reusedConnection", - reusedConnection, - this, - 0.0, - 0, - true); + + // fake the connect callback + // TODO: can we sync call ICAPXaction::noteCommConnected here instead? + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &ICAPXaction::noteCommConnected); + dialer.params.flag = COMM_OK; + // fake other parameters by copying from the existing connection + connector = asyncCall(93,3, "ICAPXaction::noteCommConnected", dialer); + ScheduleCallHere(connector); return; } @@ -135,20 +94,28 @@ void ICAPXaction::openConnection() debugs(93,3, typeName << " opens connection to " << s.host.buf() << ":" << s.port); // TODO: service bypass status may differ from that of a transaction - commSetTimeout(connection, TheICAPConfig.connect_timeout(service().bypass), - &ICAPXaction_noteCommTimedout, this); + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "ICAPXaction::noteCommTimedout", + TimeoutDialer(this,&ICAPXaction::noteCommTimedout)); - closer = &ICAPXaction_noteCommClosed; - comm_add_close_handler(connection, closer, this); + commSetTimeout(connection, TheICAPConfig.connect_timeout(service().bypass), timeoutCall); - connector = &ICAPXaction_noteCommConnected; - commConnectStart(connection, s.host.buf(), s.port, connector, this); + typedef CommCbMemFunT CloseDialer; + closer = asyncCall(93, 5, "ICAPXaction::noteCommClosed", + CloseDialer(this,&ICAPXaction::noteCommClosed)); + comm_add_close_handler(connection, closer); + + typedef CommCbMemFunT ConnectDialer; + connector = asyncCall(93,3, "ICAPXaction::noteCommConnected", + ConnectDialer(this, &ICAPXaction::noteCommConnected)); + commConnectStart(connection, s.host.buf(), s.port, connector); } /* * This event handler is necessary to work around the no-rentry policy * of ICAPXaction::callStart() */ +#if 0 void ICAPXaction::reusedConnection(void *data) { @@ -156,13 +123,14 @@ ICAPXaction::reusedConnection(void *data) ICAPXaction *x = (ICAPXaction*)data; x->noteCommConnected(COMM_OK); } +#endif void ICAPXaction::closeConnection() { if (connection >= 0) { - if (closer) { - comm_remove_close_handler(connection, closer, this); + if (closer != NULL) { + comm_remove_close_handler(connection, closer); closer = NULL; } @@ -176,7 +144,8 @@ void ICAPXaction::closeConnection() if (reuseConnection) { IPAddress client_addr; debugs(93,3, HERE << "pushing pconn" << status()); - commSetTimeout(connection, -1, NULL, NULL); + AsyncCall::Pointer call = NULL; + commSetTimeout(connection, -1, call); icapPconnPool->push(connection, theService->host.buf(), theService->port, NULL, client_addr); disableRetries(); } else { @@ -193,21 +162,17 @@ void ICAPXaction::closeConnection() } // connection with the ICAP service established -void ICAPXaction::noteCommConnected(comm_err_t commStatus) +void ICAPXaction::noteCommConnected(const CommConnectCbParams &io) { - ICAPXaction_Enter(noteCommConnected); - - Must(connector); + Must(connector != NULL); connector = NULL; - if (commStatus != COMM_OK) + if (io.flag != COMM_OK) dieOnConnectionFailure(); // throws fd_table[connection].noteUse(icapPconnPool); handleCommConnected(); - - ICAPXaction_Exit(); } void ICAPXaction::dieOnConnectionFailure() { @@ -220,39 +185,34 @@ void ICAPXaction::dieOnConnectionFailure() { void ICAPXaction::scheduleWrite(MemBuf &buf) { // comm module will free the buffer - writer = &ICAPXaction_noteCommWrote; - comm_write_mbuf(connection, &buf, writer, this); + typedef CommCbMemFunT Dialer; + writer = asyncCall(93,3, "ICAPXaction::noteCommWrote", + Dialer(this, &ICAPXaction::noteCommWrote)); + + comm_write_mbuf(connection, &buf, writer); updateTimeout(); } -void ICAPXaction::noteCommWrote(comm_err_t commStatus, size_t size) +void ICAPXaction::noteCommWrote(const CommIoCbParams &io) { - ICAPXaction_Enter(noteCommWrote); - - Must(writer); + Must(writer != NULL); writer = NULL; if (ignoreLastWrite) { // a hack due to comm inability to cancel a pending write ignoreLastWrite = false; - debugs(93, 7, HERE << "ignoring last write; status: " << commStatus); + debugs(93, 7, HERE << "ignoring last write; status: " << io.flag); } else { - Must(commStatus == COMM_OK); + Must(io.flag == COMM_OK); updateTimeout(); - handleCommWrote(size); + handleCommWrote(io.size); } - - ICAPXaction_Exit(); } // communication timeout with the ICAP service -void ICAPXaction::noteCommTimedout() +void ICAPXaction::noteCommTimedout(const CommTimeoutCbParams &io) { - ICAPXaction_Enter(noteCommTimedout); - handleCommTimedout(); - - ICAPXaction_Exit(); } void ICAPXaction::handleCommTimedout() @@ -262,20 +222,16 @@ void ICAPXaction::handleCommTimedout() reuseConnection = false; service().noteFailure(); - throw TexcHere(connector ? + throw TexcHere(connector != NULL ? "timed out while connecting to the ICAP service" : "timed out while talking to the ICAP service"); } // unexpected connection close while talking to the ICAP service -void ICAPXaction::noteCommClosed() +void ICAPXaction::noteCommClosed(const CommCloseCbParams &io) { closer = NULL; - ICAPXaction_Enter(noteCommClosed); - handleCommClosed(); - - ICAPXaction_Exit(); } void ICAPXaction::handleCommClosed() @@ -298,16 +254,20 @@ bool ICAPXaction::doneAll() const } void ICAPXaction::updateTimeout() { - if (reader || writer) { + if (reader != NULL || writer != NULL) { // restart the timeout before each I/O // XXX: why does Config.Timeout lacks a write timeout? // TODO: service bypass status may differ from that of a transaction - commSetTimeout(connection, TheICAPConfig.io_timeout(service().bypass), - &ICAPXaction_noteCommTimedout, this); + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer call = asyncCall(93, 5, "ICAPXaction::noteCommTimedout", + TimeoutDialer(this,&ICAPXaction::noteCommTimedout)); + + commSetTimeout(connection, TheICAPConfig.io_timeout(service().bypass), call); } else { // clear timeout when there is no I/O // Do we need a lifetime timeout? - commSetTimeout(connection, -1, NULL, NULL); + AsyncCall::Pointer call = NULL; + commSetTimeout(connection, -1, call); } } @@ -317,62 +277,52 @@ void ICAPXaction::scheduleRead() Must(!reader); Must(readBuf.hasSpace()); - reader = &ICAPXaction_noteCommRead; /* * See comments in ICAPXaction.h about why we use commBuf * here instead of reading directly into readBuf.buf. */ + typedef CommCbMemFunT Dialer; + reader = asyncCall(93,3, "ICAPXaction::noteCommRead", + Dialer(this, &ICAPXaction::noteCommRead)); - comm_read(connection, commBuf, readBuf.spaceSize(), reader, this); + comm_read(connection, commBuf, readBuf.spaceSize(), reader); updateTimeout(); } // comm module read a portion of the ICAP response for us -void ICAPXaction::noteCommRead(comm_err_t commStatus, size_t sz) +void ICAPXaction::noteCommRead(const CommIoCbParams &io) { - ICAPXaction_Enter(noteCommRead); - - Must(reader); + Must(reader != NULL); reader = NULL; - Must(commStatus == COMM_OK); - Must(sz >= 0); + Must(io.flag == COMM_OK); + Must(io.size >= 0); updateTimeout(); - debugs(93, 3, HERE << "read " << sz << " bytes"); + debugs(93, 3, HERE << "read " << io.size << " bytes"); /* * See comments in ICAPXaction.h about why we use commBuf * here instead of reading directly into readBuf.buf. */ - if (sz > 0) { - readBuf.append(commBuf, sz); + if (io.size > 0) { + readBuf.append(commBuf, io.size); disableRetries(); // because pconn did not fail } else { reuseConnection = false; commEof = true; } - handleCommRead(sz); - - ICAPXaction_Exit(); + handleCommRead(io.size); } void ICAPXaction::cancelRead() { - if (reader) { - // check callback presence because comm module removes - // fdc_table[].read.callback after the actual I/O but - // before we get the callback via a queued event. - // These checks try to mimic the comm_read_cancel() assertions. - - if (comm_has_pending_read(connection) && - !comm_has_pending_read_callback(connection)) { - comm_read_cancel(connection, reader, this); - reader = NULL; - } + if (reader != NULL) { + comm_read_cancel(connection, reader); + reader = NULL; } } @@ -420,14 +370,12 @@ bool ICAPXaction::doneWithIo() const // initiator aborted void ICAPXaction::noteInitiatorAborted() { - ICAPXaction_Enter(noteInitiatorAborted); if (theInitiator) { clearInitiator(); mustStop("initiator aborted"); } - ICAPXaction_Exit(); } // This 'last chance' method is called before a 'done' transaction is deleted. @@ -475,10 +423,10 @@ void ICAPXaction::fillPendingStatus(MemBuf &buf) const if (connection >= 0) { buf.Printf("FD %d", connection); - if (writer) + if (writer != NULL) buf.append("w", 1); - if (reader) + if (reader != NULL) buf.append("r", 1); buf.append(";", 1); diff --git a/src/ICAP/ICAPXaction.h b/src/ICAP/ICAPXaction.h index d1485274f8..4c04a12da6 100644 --- a/src/ICAP/ICAPXaction.h +++ b/src/ICAP/ICAPXaction.h @@ -1,6 +1,6 @@ /* - * $Id: ICAPXaction.h,v 1.12 2007/06/19 21:08:33 rousskov Exp $ + * $Id: ICAPXaction.h,v 1.13 2008/02/12 23:12:46 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -35,11 +35,13 @@ #define SQUID_ICAPXACTION_H #include "comm.h" +#include "CommCalls.h" #include "MemBuf.h" #include "ICAPServiceRep.h" #include "ICAPInitiate.h" class HttpMsg; +class CommConnectCbParams; /* * The ICAP Xaction implements common tasks for ICAP OPTIONS, REQMOD, and @@ -61,11 +63,11 @@ public: void disableRetries(); // comm handler wrappers, treat as private - void noteCommConnected(comm_err_t status); - void noteCommWrote(comm_err_t status, size_t sz); - void noteCommRead(comm_err_t status, size_t sz); - void noteCommTimedout(); - void noteCommClosed(); + void noteCommConnected(const CommConnectCbParams &io); + void noteCommWrote(const CommIoCbParams &io); + void noteCommRead(const CommIoCbParams &io); + void noteCommTimedout(const CommTimeoutCbParams &io); + void noteCommClosed(const CommCloseCbParams &io); protected: virtual void start(); @@ -111,8 +113,6 @@ protected: virtual void callEnd(); protected: - const int id; // transaction ID for debugging, unique across ICAP xactions - int connection; // FD of the ICAP server connection /* @@ -137,22 +137,13 @@ protected: const char *stopReason; // active (pending) comm callbacks for the ICAP server connection - CNCB *connector; - IOCB *reader; - IOCB *writer; - PF *closer; + AsyncCall::Pointer connector; + AsyncCall::Pointer reader; + AsyncCall::Pointer writer; + AsyncCall::Pointer closer; private: - static int TheLastId; - - static void reusedConnection(void *data); - //CBDATA_CLASS2(ICAPXaction); }; -// call guards for all "asynchronous" note*() methods -// If we move ICAPXaction_* macros to core, they can use these generic names: -#define ICAPXaction_Enter(method) AsyncCallEnter(method) -#define ICAPXaction_Exit() AsyncCallExit() - #endif /* SQUID_ICAPXACTION_H */