]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/adaptation/ecap/ServiceRep.cc
Merged from trunk (r13356).
[thirdparty/squid.git] / src / adaptation / ecap / ServiceRep.cc
index 260850591bd68334f363aff07e66fb8ea1eb296e..c567dc13032bb22a9b67a9f9dc0333a8a98f3c24 100644 (file)
@@ -2,20 +2,28 @@
  * DEBUG: section 93    eCAP Interface
  */
 #include "squid.h"
-#include "Debug.h"
-#include <list>
-#include <libecap/adapter/service.h>
-#include <libecap/common/options.h>
-#include <libecap/common/name.h>
-#include <libecap/common/named_values.h>
 #include "adaptation/ecap/Config.h"
 #include "adaptation/ecap/Host.h"
 #include "adaptation/ecap/ServiceRep.h"
 #include "adaptation/ecap/XactionRep.h"
+#include "AsyncEngine.h"
 #include "base/TextException.h"
+#include "Debug.h"
+#include "EventLoop.h"
+
+#include <libecap/adapter/service.h>
+#include <libecap/common/options.h>
+#include <libecap/common/name.h>
+#include <libecap/common/named_values.h>
+#include <limits>
+#include <map>
 
-// configured eCAP service wrappers
-static std::list<Adaptation::Ecap::ServiceRep::AdapterService> TheServices;
+/// libecap::adapter::services indexed by their URI
+typedef std::map<std::string, Adaptation::Ecap::ServiceRep::AdapterService> AdapterServices;
+/// all loaded services
+static AdapterServices TheServices;
+/// configured services producing async transactions
+static AdapterServices AsyncServices;
 
 namespace Adaptation
 {
@@ -39,6 +47,17 @@ public:
     const Master &master; ///< the configuration being wrapped
 };
 
+/// manages async eCAP transactions
+class Engine: public AsyncEngine
+{
+public:
+    /* AsyncEngine API */
+    virtual int checkEvents(int timeout);
+
+private:
+    void kickAsyncServices(timeval &timeout);
+};
+
 } // namespace Ecap
 } // namespace Adaptation
 
@@ -76,6 +95,55 @@ Adaptation::Ecap::ConfigRep::visitEachOption(libecap::NamedValueVisitor &visitor
         visitor.visit(Name(i->first), Area::FromTempString(i->second));
 }
 
+/* Adaptation::Ecap::Engine */
+
+int
+Adaptation::Ecap::Engine::checkEvents(int)
+{
+    // Start with the default I/O loop timeout, convert from milliseconds.
+    static const struct timeval maxTimeout = {
+        EVENT_LOOP_TIMEOUT/1000, // seconds
+        (EVENT_LOOP_TIMEOUT % 1000)*1000
+    }; // microseconds
+    struct timeval timeout = maxTimeout;
+
+    kickAsyncServices(timeout);
+    if (timeout.tv_sec == maxTimeout.tv_sec && timeout.tv_usec == maxTimeout.tv_usec)
+        return EVENT_IDLE;
+
+    debugs(93, 7, "timeout: " << timeout.tv_sec << "s+" << timeout.tv_usec << "us");
+
+    // convert back to milliseconds, avoiding int overflows
+    if (timeout.tv_sec >= std::numeric_limits<int>::max()/1000 - 1000)
+        return std::numeric_limits<int>::max();
+    else
+        return timeout.tv_sec*1000 + timeout.tv_usec/1000;
+}
+
+/// resumes async transactions (if any) and returns true if they set a timeout
+void
+Adaptation::Ecap::Engine::kickAsyncServices(timeval &timeout)
+{
+    if (AsyncServices.empty())
+        return;
+
+    debugs(93, 3, "async services: " << AsyncServices.size());
+
+    // Activate waiting async transactions, if any.
+    typedef AdapterServices::iterator ASI;
+    for (ASI s = AsyncServices.begin(); s != AsyncServices.end(); ++s) {
+        assert(s->second);
+        s->second->resume(); // may call Ecap::Xaction::resume()
+    }
+
+    // Give services a chance to decrease the default timeout.
+    for (ASI s = AsyncServices.begin(); s != AsyncServices.end(); ++s) {
+        s->second->suspend(timeout);
+    }
+}
+
+/* Adaptation::Ecap::ServiceRep */
+
 Adaptation::Ecap::ServiceRep::ServiceRep(const ServiceConfigPointer &cfg):
         /*AsyncJob("Adaptation::Ecap::ServiceRep"),*/ Adaptation::Service(cfg),
         isDetached(false)
@@ -123,6 +191,11 @@ Adaptation::Ecap::ServiceRep::tryConfigureAndStart()
 
     debugs(93,DBG_IMPORTANT, "Starting eCAP service: " << theService->uri());
     theService->start();
+
+    if (theService->makesAsyncXactions()) {
+        AsyncServices[theService->uri()] = theService;
+        debugs(93, 5, "asyncs: " << AsyncServices.size());
+    }
 }
 
 /// handles failures while configuring or starting an eCAP service;
@@ -165,10 +238,20 @@ bool Adaptation::Ecap::ServiceRep::wantsUrl(const String &urlPath) const
 
 Adaptation::Initiate *
 Adaptation::Ecap::ServiceRep::makeXactLauncher(HttpMsg *virgin,
-        HttpRequest *cause)
+        HttpRequest *cause, AccessLogEntry::Pointer &alp)
 {
     Must(up());
-    XactionRep *rep = new XactionRep(virgin, cause, Pointer(this));
+
+    // register now because (a) we need EventLoop::Running and (b) we do not
+    // want to add more main loop overheads unless an async service is used.
+    static AsyncEngine *TheEngine = NULL;
+    if (AsyncServices.size() && !TheEngine && EventLoop::Running) {
+        TheEngine = new Engine;
+        EventLoop::Running->registerEngine(TheEngine);
+        debugs(93, 3, "asyncs: " << AsyncServices.size() << ' ' << TheEngine);
+    }
+
+    XactionRep *rep = new XactionRep(virgin, cause, alp, Pointer(this));
     XactionRep::AdapterXaction x(theService->makeXaction(rep));
     rep->master(x);
     return rep;
@@ -210,11 +293,10 @@ bool Adaptation::Ecap::ServiceRep::detached() const
 Adaptation::Ecap::ServiceRep::AdapterService
 Adaptation::Ecap::FindAdapterService(const String& serviceUri)
 {
-    typedef std::list<ServiceRep::AdapterService>::const_iterator ASCI;
-    for (ASCI s = TheServices.begin(); s != TheServices.end(); ++s) {
-        Must(*s);
-        if (serviceUri == (*s)->uri().c_str())
-            return *s;
+    AdapterServices::const_iterator pos = TheServices.find(serviceUri.termedBuf());
+    if (pos != TheServices.end()) {
+        Must(pos->second);
+        return pos->second;
     }
     return ServiceRep::AdapterService();
 }
@@ -222,30 +304,18 @@ Adaptation::Ecap::FindAdapterService(const String& serviceUri)
 void
 Adaptation::Ecap::RegisterAdapterService(const Adaptation::Ecap::ServiceRep::AdapterService& adapterService)
 {
-    typedef std::list<ServiceRep::AdapterService>::iterator ASI;
-    for (ASI s = TheServices.begin(); s != TheServices.end(); ++s) {
-        Must(*s);
-        if (adapterService->uri() == (*s)->uri()) {
-            *s = adapterService;
-            debugs(93, 3, "updated eCAP module service: " <<
-                   adapterService->uri());
-            return;
-        }
-    }
-    TheServices.push_back(adapterService);
-    debugs(93, 3, "registered eCAP module service: " << adapterService->uri());
+    TheServices[adapterService->uri()] = adapterService; // may update old one
+    debugs(93, 3, "stored eCAP module service: " << adapterService->uri());
+    // We do not update AsyncServices here in case they are not configured.
 }
 
 void
 Adaptation::Ecap::UnregisterAdapterService(const String& serviceUri)
 {
-    typedef std::list<ServiceRep::AdapterService>::iterator ASI;
-    for (ASI s = TheServices.begin(); s != TheServices.end(); ++s) {
-        if (serviceUri == (*s)->uri().c_str()) {
-            TheServices.erase(s);
-            debugs(93, 3, "unregistered eCAP module service: " << serviceUri);
-            return;
-        }
+    if (TheServices.erase(serviceUri.termedBuf())) {
+        debugs(93, 3, "unregistered eCAP module service: " << serviceUri);
+        AsyncServices.erase(serviceUri.termedBuf()); // no-op for non-async
+        return;
     }
     debugs(93, 3, "failed to unregister eCAP module service: " << serviceUri);
 }
@@ -253,16 +323,16 @@ Adaptation::Ecap::UnregisterAdapterService(const String& serviceUri)
 void
 Adaptation::Ecap::CheckUnusedAdapterServices(const Adaptation::Services& cfgs)
 {
-    typedef std::list<ServiceRep::AdapterService>::const_iterator ASCI;
+    typedef AdapterServices::const_iterator ASCI;
     for (ASCI loaded = TheServices.begin(); loaded != TheServices.end();
             ++loaded) {
         bool found = false;
         for (Services::const_iterator cfged = cfgs.begin();
                 cfged != cfgs.end() && !found; ++cfged) {
-            found = (*cfged)->cfg().uri == (*loaded)->uri().c_str();
+            found = (*cfged)->cfg().uri == loaded->second->uri().c_str();
         }
         if (!found)
             debugs(93, DBG_IMPORTANT, "Warning: loaded eCAP service has no matching " <<
-                   "ecap_service config option: " << (*loaded)->uri());
+                   "ecap_service config option: " << loaded->second->uri());
     }
 }