if test -n "$PKG_CONFIG"; then
dnl eCAP support requires libecap.
- dnl This Squid supports libecap v0.2.x.
+ dnl This Squid supports libecap v1.0.x.
dnl Use EXT_ prefix to distinguish external libecap (that we check for
dnl here) from our own convenience ecap library in Makefiles.
- PKG_CHECK_MODULES([EXT_LIBECAP],[libecap >= 0.2.0 libecap < 0.3])
+ PKG_CHECK_MODULES([EXT_LIBECAP],[libecap >= 1.0 libecap < 1.1])
else
AC_MSG_NOTICE([eCAP support requires pkg-config to verify the correct library version. Trouble may follow.])
fi
#include "EventLoop.h"
#include "SquidTime.h"
+EventLoop *EventLoop::Running = NULL;
+
EventLoop::EventLoop() : errcount(0), last_loop(false), timeService(NULL),
primaryEngine(NULL),
loop_delay(EVENT_LOOP_TIMEOUT),
{
prepareToRun();
+ assert(!Running);
+ Running = this;
+
while (!runOnce());
+
+ Running = NULL;
}
bool
int errcount;
+ /// the [main program] loop running now; may be nil
+ /// for simplicity, we assume there are no concurrent loops
+ static EventLoop *Running;
+
private:
/** setup state variables prior to running */
void prepareToRun();
tests/stub_main_cc.cc \
tests/stub_ipc_Forwarder.cc \
tests/stub_store_stats.cc \
+ tests/stub_EventLoop.cc \
time.cc \
BodyPipe.cc \
cache_manager.cc \
tests/stub_main_cc.cc \
tests/stub_MemStore.cc \
tests/stub_store_stats.cc \
+ tests/stub_EventLoop.cc \
time.cc \
tools.h \
tools.cc \
tests/stub_ipc_Forwarder.cc \
tests/stub_libeui.cc \
tests/stub_store_stats.cc \
+ tests/stub_EventLoop.cc \
time.cc \
BodyPipe.cc \
cache_manager.cc \
tests/testURLScheme.h \
tests/testMain.cc \
tests/stub_time.cc \
+ tests/stub_EventLoop.cc \
tools.h \
tools.cc \
tests/stub_tunnel.cc \
os << PACKAGE_NAME << " v" << PACKAGE_VERSION;
}
+/// Strips libecap version components not affecting compatibility decisions.
+static SBuf
+EssentialVersion(const SBuf &raw)
+{
+ // all libecap x.y.* releases are supposed to be compatible so we strip
+ // everything after the second period
+ const SBuf::size_type minorPos = raw.find('.');
+ const SBuf::size_type microPos = minorPos == SBuf::npos ?
+ SBuf::npos : raw.find('.', minorPos+1);
+ return raw.substr(0, microPos); // becomes raw if microPos is npos
+}
+
+/// If "their" libecap version is not compatible with what Squid has been built
+/// with, then complain and return false.
+static bool
+SupportedVersion(const char *vTheir, const char *them)
+{
+ if (!vTheir || !*vTheir) {
+ debugs(93, DBG_CRITICAL, "ERROR: Cannot use " << them <<
+ " with libecap prior to v1.0.");
+ return false;
+ }
+
+ // we support what we are built with
+ const SBuf vSupported(LIBECAP_VERSION);
+ debugs(93, 2, them << " with libecap v" << vTheir << "; us: v" << vSupported);
+
+ if (EssentialVersion(SBuf(vTheir)) == EssentialVersion(vSupported))
+ return true; // their version is supported
+
+ debugs(93, DBG_CRITICAL, "ERROR: Cannot use " << them <<
+ " with libecap v" << vTheir <<
+ ": incompatible with supported libecap v" << vSupported);
+ return false;
+}
+
void
-Adaptation::Ecap::Host::noteService(const libecap::weak_ptr<libecap::adapter::Service> &weak)
+Adaptation::Ecap::Host::noteVersionedService(const char *vGiven, const libecap::weak_ptr<libecap::adapter::Service> &weak)
{
- Must(!weak.expired());
- RegisterAdapterService(weak.lock());
+ /*
+ * Check that libecap used to build the service is compatible with ours.
+ * This has to be done using vGiven string and not Service object itself
+ * because dereferencing a Service pointer coming from an unsupported
+ * version is unsafe.
+ */
+ if (SupportedVersion(vGiven, "eCAP service built")) {
+ Must(!weak.expired());
+ RegisterAdapterService(weak.lock());
+ }
}
static int
void
Adaptation::Ecap::Host::Register()
{
- if (!TheHost) {
+ if (!TheHost && SupportedVersion(libecap::VersionString(),
+ "Squid executable dynamically linked")) {
TheHost.reset(new Adaptation::Ecap::Host);
libecap::RegisterHost(TheHost);
}
/* libecap::host::Host API */
virtual std::string uri() const; // unique across all vendors
virtual void describe(std::ostream &os) const; // free-format info
- virtual void noteService(const libecap::weak_ptr<libecap::adapter::Service> &s);
+ virtual void noteVersionedService(const char *libEcapVersion, const libecap::weak_ptr<libecap::adapter::Service> &s);
virtual std::ostream *openDebug(libecap::LogVerbosity lv);
virtual void closeDebug(std::ostream *debug);
typedef libecap::shared_ptr<libecap::Message> MessagePtr;
*/
#include "squid.h"
#include "Debug.h"
-#include <list>
-#include <libecap/adapter/service.h>
-#include <libecap/common/options.h>
-#include <libecap/common/name.h>
-#include <libecap/common/named_values.h>
+#include "EventLoop.h"
#include "adaptation/ecap/Config.h"
#include "adaptation/ecap/Host.h"
#include "adaptation/ecap/ServiceRep.h"
#include "adaptation/ecap/XactionRep.h"
+#include "AsyncEngine.h"
#include "base/TextException.h"
-// configured eCAP service wrappers
-static std::list<Adaptation::Ecap::ServiceRep::AdapterService> TheServices;
+#include <libecap/adapter/service.h>
+#include <libecap/common/options.h>
+#include <libecap/common/name.h>
+#include <libecap/common/named_values.h>
+#if HAVE_LIMITS
+#include <limits>
+#endif
+#include <map>
+
+/// libecap::adapter::services indexed by their URI
+typedef std::map<std::string, Adaptation::Ecap::ServiceRep::AdapterService> AdapterServices;
+/// all loaded services
+static AdapterServices TheServices;
+/// configured services producing async transactions
+static AdapterServices AsyncServices;
namespace Adaptation
{
const Master &master; ///< the configuration being wrapped
};
+/// manages async eCAP transactions
+class Engine: public AsyncEngine
+{
+public:
+ /* AsyncEngine API */
+ virtual int checkEvents(int timeout);
+
+private:
+ void kickAsyncServices(timeval &timeout);
+};
+
} // namespace Ecap
} // namespace Adaptation
visitor.visit(Name(i->first), Area::FromTempString(i->second));
}
+/* Adaptation::Ecap::Engine */
+
+int
+Adaptation::Ecap::Engine::checkEvents(int)
+{
+ // Start with the default I/O loop timeout, convert from milliseconds.
+ static const struct timeval maxTimeout {
+ EVENT_LOOP_TIMEOUT/1000, // seconds
+ (EVENT_LOOP_TIMEOUT % 1000)*1000
+ }; // microseconds
+ struct timeval timeout = maxTimeout;
+
+ kickAsyncServices(timeout);
+ if (timeout.tv_sec == maxTimeout.tv_sec && timeout.tv_usec == maxTimeout.tv_usec)
+ return EVENT_IDLE;
+
+ debugs(93, 7, "timeout: " << timeout.tv_sec << "s+" << timeout.tv_usec << "us");
+
+ // convert back to milliseconds, avoiding int overflows
+ if (timeout.tv_sec >= std::numeric_limits<int>::max()/1000 - 1000)
+ return std::numeric_limits<int>::max();
+ else
+ return timeout.tv_sec*1000 + timeout.tv_usec/1000;
+}
+
+/// resumes async transactions (if any) and returns true if they set a timeout
+void
+Adaptation::Ecap::Engine::kickAsyncServices(timeval &timeout)
+{
+ if (AsyncServices.empty())
+ return;
+
+ debugs(93, 3, "async services: " << AsyncServices.size());
+
+ // Activate waiting async transactions, if any.
+ typedef AdapterServices::iterator ASI;
+ for (ASI s = AsyncServices.begin(); s != AsyncServices.end(); ++s) {
+ assert(s->second);
+ s->second->resume(); // may call Ecap::Xaction::resume()
+ }
+
+ // Give services a chance to decrease the default timeout.
+ for (ASI s = AsyncServices.begin(); s != AsyncServices.end(); ++s) {
+ s->second->suspend(timeout);
+ }
+}
+
+/* Adaptation::Ecap::ServiceRep */
+
Adaptation::Ecap::ServiceRep::ServiceRep(const ServiceConfigPointer &cfg):
/*AsyncJob("Adaptation::Ecap::ServiceRep"),*/ Adaptation::Service(cfg),
isDetached(false)
debugs(93,DBG_IMPORTANT, "Starting eCAP service: " << theService->uri());
theService->start();
+
+ if (theService->makesAsyncXactions()) {
+ AsyncServices[theService->uri()] = theService;
+ debugs(93, 5, "asyncs: " << AsyncServices.size());
+ }
}
/// handles failures while configuring or starting an eCAP service;
HttpRequest *cause)
{
Must(up());
+
+ // register now because (a) we need EventLoop::Running and (b) we do not
+ // want to add more main loop overheads unless an async service is used.
+ static AsyncEngine *TheEngine = NULL;
+ if (AsyncServices.size() && !TheEngine && EventLoop::Running) {
+ TheEngine = new Engine;
+ EventLoop::Running->registerEngine(TheEngine);
+ debugs(93, 3, "asyncs: " << AsyncServices.size() << ' ' << TheEngine);
+ }
+
XactionRep *rep = new XactionRep(virgin, cause, Pointer(this));
XactionRep::AdapterXaction x(theService->makeXaction(rep));
rep->master(x);
Adaptation::Ecap::ServiceRep::AdapterService
Adaptation::Ecap::FindAdapterService(const String& serviceUri)
{
- typedef std::list<ServiceRep::AdapterService>::const_iterator ASCI;
- for (ASCI s = TheServices.begin(); s != TheServices.end(); ++s) {
- Must(*s);
- if (serviceUri == (*s)->uri().c_str())
- return *s;
+ AdapterServices::const_iterator pos = TheServices.find(serviceUri.termedBuf());
+ if (pos != TheServices.end()) {
+ Must(pos->second);
+ return pos->second;
}
return ServiceRep::AdapterService();
}
void
Adaptation::Ecap::RegisterAdapterService(const Adaptation::Ecap::ServiceRep::AdapterService& adapterService)
{
- typedef std::list<ServiceRep::AdapterService>::iterator ASI;
- for (ASI s = TheServices.begin(); s != TheServices.end(); ++s) {
- Must(*s);
- if (adapterService->uri() == (*s)->uri()) {
- *s = adapterService;
- debugs(93, 3, "updated eCAP module service: " <<
- adapterService->uri());
- return;
- }
- }
- TheServices.push_back(adapterService);
- debugs(93, 3, "registered eCAP module service: " << adapterService->uri());
+ TheServices[adapterService->uri()] = adapterService; // may update old one
+ debugs(93, 3, "stored eCAP module service: " << adapterService->uri());
+ // We do not update AsyncServices here in case they are not configured.
}
void
Adaptation::Ecap::UnregisterAdapterService(const String& serviceUri)
{
- typedef std::list<ServiceRep::AdapterService>::iterator ASI;
- for (ASI s = TheServices.begin(); s != TheServices.end(); ++s) {
- if (serviceUri == (*s)->uri().c_str()) {
- TheServices.erase(s);
- debugs(93, 3, "unregistered eCAP module service: " << serviceUri);
- return;
- }
+ if (TheServices.erase(serviceUri.termedBuf())) {
+ debugs(93, 3, "unregistered eCAP module service: " << serviceUri);
+ AsyncServices.erase(serviceUri.termedBuf()); // no-op for non-async
+ return;
}
debugs(93, 3, "failed to unregister eCAP module service: " << serviceUri);
}
void
Adaptation::Ecap::CheckUnusedAdapterServices(const Adaptation::Services& cfgs)
{
- typedef std::list<ServiceRep::AdapterService>::const_iterator ASCI;
+ typedef AdapterServices::const_iterator ASCI;
for (ASCI loaded = TheServices.begin(); loaded != TheServices.end();
++loaded) {
bool found = false;
for (Services::const_iterator cfged = cfgs.begin();
cfged != cfgs.end() && !found; ++cfged) {
- found = (*cfged)->cfg().uri == (*loaded)->uri().c_str();
+ found = (*cfged)->cfg().uri == loaded->second->uri().c_str();
}
if (!found)
debugs(93, DBG_IMPORTANT, "Warning: loaded eCAP service has no matching " <<
- "ecap_service config option: " << (*loaded)->uri());
+ "ecap_service config option: " << loaded->second->uri());
}
}
#include "adaptation/ecap/Config.h"
#include "adaptation/ecap/XactionRep.h"
#include "adaptation/Initiator.h"
+#include "base/AsyncJobCalls.h"
#include "base/TextException.h"
#include "HttpReply.h"
#include "HttpRequest.h"
Adaptation::Initiate::swanSong();
}
+void
+Adaptation::Ecap::XactionRep::resume()
+{
+ // go async to gain exception protection and done()-based job destruction
+ typedef NullaryMemFunT<Adaptation::Ecap::XactionRep> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Ecap::XactionRep::doResume",
+ Dialer(this, &Adaptation::Ecap::XactionRep::doResume));
+ ScheduleCallHere(call);
+}
+
+/// the guts of libecap::host::Xaction::resume() API implementation
+/// which just goes async in Adaptation::Ecap::XactionRep::resume().
+void
+Adaptation::Ecap::XactionRep::doResume()
+{
+ Must(theMaster);
+ theMaster->resume();
+}
+
libecap::Message &
Adaptation::Ecap::XactionRep::virgin()
{
mustStop("adaptationAborted");
}
-bool
-Adaptation::Ecap::XactionRep::callable() const
-{
- return !done();
-}
-
void
Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
{
virtual void blockVirgin();
virtual void adaptationDelayed(const libecap::Delay &);
virtual void adaptationAborted();
+ virtual void resume();
virtual void vbDiscard();
virtual void vbMake();
virtual void vbStopMaking();
virtual void noteAbContentDone(bool atEnd);
virtual void noteAbContentAvailable();
- // libecap::Callable API, via libecap::host::Xaction
- virtual bool callable() const;
-
// BodyProducer API
virtual void noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp);
virtual void noteBodyConsumerAborted(RefCount<BodyPipe> bp);
/// Return the adaptation meta headers and their values
void visitEachMetaHeader(libecap::NamedValueVisitor &visitor) const;
+ void doResume();
+
private:
AdapterXaction theMaster; // the actual adaptation xaction we represent
Adaptation::ServicePointer theService; ///< xaction's adaptation service
{
public:
- SignalEngine(EventLoop &evtLoop) : loop(evtLoop) {}
virtual int checkEvents(int timeout);
private:
- static void StopEventLoop(void * data) {
- static_cast<SignalEngine *>(data)->loop.stop();
+ static void StopEventLoop(void *) {
+ if (EventLoop::Running)
+ EventLoop::Running->stop();
}
void doShutdown(time_t wait);
-
- EventLoop &loop;
};
int
/* main loop */
EventLoop mainLoop;
- SignalEngine signalEngine(mainLoop);
+ SignalEngine signalEngine;
mainLoop.registerEngine(&signalEngine);
--- /dev/null
+#include "squid.h"
+#include "EventLoop.h"
+
+#define STUB_API "EventLoop.cc"
+#include "tests/STUB.h"
+
+EventLoop *EventLoop::Running = NULL;
+
+EventLoop::EventLoop(): errcount(0), last_loop(false), timeService(NULL),
+ primaryEngine(NULL), loop_delay(0), error(false), runOnceResult(false)
+ STUB_NOP
+
+void EventLoop::registerEngine(AsyncEngine *engine) STUB