* DEBUG: section 93 eCAP Interface
*/
#include "squid.h"
-#include "Debug.h"
-#include <list>
-#include <libecap/adapter/service.h>
-#include <libecap/common/options.h>
-#include <libecap/common/name.h>
-#include <libecap/common/named_values.h>
#include "adaptation/ecap/Config.h"
#include "adaptation/ecap/Host.h"
#include "adaptation/ecap/ServiceRep.h"
#include "adaptation/ecap/XactionRep.h"
+#include "AsyncEngine.h"
#include "base/TextException.h"
+#include "Debug.h"
+#include "EventLoop.h"
+
+#include <libecap/adapter/service.h>
+#include <libecap/common/options.h>
+#include <libecap/common/name.h>
+#include <libecap/common/named_values.h>
+#include <limits>
+#include <map>
-// configured eCAP service wrappers
-static std::list<Adaptation::Ecap::ServiceRep::AdapterService> TheServices;
+/// libecap::adapter::services indexed by their URI
+typedef std::map<std::string, Adaptation::Ecap::ServiceRep::AdapterService> AdapterServices;
+/// all loaded services
+static AdapterServices TheServices;
+/// configured services producing async transactions
+static AdapterServices AsyncServices;
namespace Adaptation
{
const Master &master; ///< the configuration being wrapped
};
+/// manages async eCAP transactions
+class Engine: public AsyncEngine
+{
+public:
+ /* AsyncEngine API */
+ virtual int checkEvents(int timeout);
+
+private:
+ void kickAsyncServices(timeval &timeout);
+};
+
} // namespace Ecap
} // namespace Adaptation
visitor.visit(Name(i->first), Area::FromTempString(i->second));
}
+/* Adaptation::Ecap::Engine */
+
+int
+Adaptation::Ecap::Engine::checkEvents(int)
+{
+ // Start with the default I/O loop timeout, convert from milliseconds.
+ static const struct timeval maxTimeout = {
+ EVENT_LOOP_TIMEOUT/1000, // seconds
+ (EVENT_LOOP_TIMEOUT % 1000)*1000
+ }; // microseconds
+ struct timeval timeout = maxTimeout;
+
+ kickAsyncServices(timeout);
+ if (timeout.tv_sec == maxTimeout.tv_sec && timeout.tv_usec == maxTimeout.tv_usec)
+ return EVENT_IDLE;
+
+ debugs(93, 7, "timeout: " << timeout.tv_sec << "s+" << timeout.tv_usec << "us");
+
+ // convert back to milliseconds, avoiding int overflows
+ if (timeout.tv_sec >= std::numeric_limits<int>::max()/1000 - 1000)
+ return std::numeric_limits<int>::max();
+ else
+ return timeout.tv_sec*1000 + timeout.tv_usec/1000;
+}
+
+/// resumes async transactions (if any) and returns true if they set a timeout
+void
+Adaptation::Ecap::Engine::kickAsyncServices(timeval &timeout)
+{
+ if (AsyncServices.empty())
+ return;
+
+ debugs(93, 3, "async services: " << AsyncServices.size());
+
+ // Activate waiting async transactions, if any.
+ typedef AdapterServices::iterator ASI;
+ for (ASI s = AsyncServices.begin(); s != AsyncServices.end(); ++s) {
+ assert(s->second);
+ s->second->resume(); // may call Ecap::Xaction::resume()
+ }
+
+ // Give services a chance to decrease the default timeout.
+ for (ASI s = AsyncServices.begin(); s != AsyncServices.end(); ++s) {
+ s->second->suspend(timeout);
+ }
+}
+
+/* Adaptation::Ecap::ServiceRep */
+
Adaptation::Ecap::ServiceRep::ServiceRep(const ServiceConfigPointer &cfg):
/*AsyncJob("Adaptation::Ecap::ServiceRep"),*/ Adaptation::Service(cfg),
isDetached(false)
debugs(93,DBG_IMPORTANT, "Starting eCAP service: " << theService->uri());
theService->start();
+
+ if (theService->makesAsyncXactions()) {
+ AsyncServices[theService->uri()] = theService;
+ debugs(93, 5, "asyncs: " << AsyncServices.size());
+ }
}
/// handles failures while configuring or starting an eCAP service;
Adaptation::Initiate *
Adaptation::Ecap::ServiceRep::makeXactLauncher(HttpMsg *virgin,
- HttpRequest *cause)
+ HttpRequest *cause, AccessLogEntry::Pointer &alp)
{
Must(up());
- XactionRep *rep = new XactionRep(virgin, cause, Pointer(this));
+
+ // register now because (a) we need EventLoop::Running and (b) we do not
+ // want to add more main loop overheads unless an async service is used.
+ static AsyncEngine *TheEngine = NULL;
+ if (AsyncServices.size() && !TheEngine && EventLoop::Running) {
+ TheEngine = new Engine;
+ EventLoop::Running->registerEngine(TheEngine);
+ debugs(93, 3, "asyncs: " << AsyncServices.size() << ' ' << TheEngine);
+ }
+
+ XactionRep *rep = new XactionRep(virgin, cause, alp, Pointer(this));
XactionRep::AdapterXaction x(theService->makeXaction(rep));
rep->master(x);
return rep;
Adaptation::Ecap::ServiceRep::AdapterService
Adaptation::Ecap::FindAdapterService(const String& serviceUri)
{
- typedef std::list<ServiceRep::AdapterService>::const_iterator ASCI;
- for (ASCI s = TheServices.begin(); s != TheServices.end(); ++s) {
- Must(*s);
- if (serviceUri == (*s)->uri().c_str())
- return *s;
+ AdapterServices::const_iterator pos = TheServices.find(serviceUri.termedBuf());
+ if (pos != TheServices.end()) {
+ Must(pos->second);
+ return pos->second;
}
return ServiceRep::AdapterService();
}
void
Adaptation::Ecap::RegisterAdapterService(const Adaptation::Ecap::ServiceRep::AdapterService& adapterService)
{
- typedef std::list<ServiceRep::AdapterService>::iterator ASI;
- for (ASI s = TheServices.begin(); s != TheServices.end(); ++s) {
- Must(*s);
- if (adapterService->uri() == (*s)->uri()) {
- *s = adapterService;
- debugs(93, 3, "updated eCAP module service: " <<
- adapterService->uri());
- return;
- }
- }
- TheServices.push_back(adapterService);
- debugs(93, 3, "registered eCAP module service: " << adapterService->uri());
+ TheServices[adapterService->uri()] = adapterService; // may update old one
+ debugs(93, 3, "stored eCAP module service: " << adapterService->uri());
+ // We do not update AsyncServices here in case they are not configured.
}
void
Adaptation::Ecap::UnregisterAdapterService(const String& serviceUri)
{
- typedef std::list<ServiceRep::AdapterService>::iterator ASI;
- for (ASI s = TheServices.begin(); s != TheServices.end(); ++s) {
- if (serviceUri == (*s)->uri().c_str()) {
- TheServices.erase(s);
- debugs(93, 3, "unregistered eCAP module service: " << serviceUri);
- return;
- }
+ if (TheServices.erase(serviceUri.termedBuf())) {
+ debugs(93, 3, "unregistered eCAP module service: " << serviceUri);
+ AsyncServices.erase(serviceUri.termedBuf()); // no-op for non-async
+ return;
}
debugs(93, 3, "failed to unregister eCAP module service: " << serviceUri);
}
void
Adaptation::Ecap::CheckUnusedAdapterServices(const Adaptation::Services& cfgs)
{
- typedef std::list<ServiceRep::AdapterService>::const_iterator ASCI;
+ typedef AdapterServices::const_iterator ASCI;
for (ASCI loaded = TheServices.begin(); loaded != TheServices.end();
++loaded) {
bool found = false;
for (Services::const_iterator cfged = cfgs.begin();
cfged != cfgs.end() && !found; ++cfged) {
- found = (*cfged)->cfg().uri == (*loaded)->uri().c_str();
+ found = (*cfged)->cfg().uri == loaded->second->uri().c_str();
}
if (!found)
debugs(93, DBG_IMPORTANT, "Warning: loaded eCAP service has no matching " <<
- "ecap_service config option: " << (*loaded)->uri());
+ "ecap_service config option: " << loaded->second->uri());
}
}