]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/mgr/Inquirer.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / mgr / Inquirer.cc
index 411904519361e515b64c3dec96288862c9e80eb2..9eb412dea68a646828d329b3ef1381c825f9d528 100644 (file)
@@ -1,48 +1,52 @@
 /*
- * $Id$
- *
  * DEBUG: section 16    Cache Manager API
  *
  */
 
-#include "config.h"
+#include "squid.h"
 #include "base/TextException.h"
+#include "comm.h"
+#include "comm/Connection.h"
 #include "comm/Write.h"
 #include "CommCalls.h"
+#include "errorpage.h"
 #include "HttpReply.h"
+#include "HttpRequest.h"
 #include "ipc/UdsOp.h"
 #include "mgr/ActionWriter.h"
+#include "mgr/Command.h"
 #include "mgr/Inquirer.h"
+#include "mgr/IntParam.h"
 #include "mgr/Request.h"
 #include "mgr/Response.h"
 #include "SquidTime.h"
 #include <memory>
-
+#include <algorithm>
 
 CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
 
-
 Mgr::Inquirer::Inquirer(Action::Pointer anAction,
                         const Request &aCause, const Ipc::StrandCoords &coords):
-        Ipc::Inquirer(aCause.clone(), coords, anAction->atomic() ? 10 : 100),
-        aggrAction(anAction),
-        fd(Ipc::ImportFdIntoComm(aCause.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket))
+        Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
+        aggrAction(anAction)
 {
-    debugs(16, 5, HERE << "FD " << fd << " action: " << aggrAction);
+    conn = aCause.conn;
+    Ipc::ImportFdIntoComm(conn, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
+
+    debugs(16, 5, HERE << conn << " action: " << aggrAction);
 
     closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
                        CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
-    comm_add_close_handler(fd, closer);
+    comm_add_close_handler(conn->fd, closer);
 }
 
 /// closes our copy of the client HTTP connection socket
 void
 Mgr::Inquirer::cleanup()
 {
-    if (fd >= 0) {
+    if (Comm::IsConnOpen(conn)) {
         removeCloseHandler();
-        comm_close(fd);
-        fd = -1;
+        conn->close();
     }
 }
 
@@ -50,7 +54,7 @@ void
 Mgr::Inquirer::removeCloseHandler()
 {
     if (closer != NULL) {
-        comm_remove_close_handler(fd, closer);
+        comm_remove_close_handler(conn->fd, closer);
         closer = NULL;
     }
 }
@@ -60,16 +64,38 @@ Mgr::Inquirer::start()
 {
     debugs(16, 5, HERE);
     Ipc::Inquirer::start();
-    Must(fd >= 0);
+    Must(Comm::IsConnOpen(conn));
     Must(aggrAction != NULL);
 
-    std::auto_ptr<HttpReply> reply(new HttpReply);
-    reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime);
-    reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
-    std::auto_ptr<MemBuf> replyBuf(reply->pack());
+#if HAVE_UNIQUE_PTR
+    std::unique_ptr<MemBuf> replyBuf;
+#else
+    std::auto_ptr<MemBuf> replyBuf;
+#endif
+    if (strands.empty()) {
+        LOCAL_ARRAY(char, url, MAX_URL);
+        snprintf(url, MAX_URL, "%s", aggrAction->command().params.httpUri.termedBuf());
+        HttpRequest *req = HttpRequest::CreateFromUrl(url);
+        ErrorState err(ERR_INVALID_URL, Http::scNotFound, req);
+#if HAVE_UNIQUE_PTR
+        std::unique_ptr<HttpReply> reply(err.BuildHttpReply());
+#else
+        std::auto_ptr<HttpReply> reply(err.BuildHttpReply());
+#endif
+        replyBuf.reset(reply->pack());
+    } else {
+#if HAVE_UNIQUE_PTR
+        std::unique_ptr<HttpReply> reply(new HttpReply);
+#else
+        std::auto_ptr<HttpReply> reply(new HttpReply);
+#endif
+        reply->setHeaders(Http::scOkay, NULL, "text/plain", -1, squid_curtime, squid_curtime);
+        reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
+        replyBuf.reset(reply->pack());
+    }
     writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
                        CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
-    Comm::Write(fd, replyBuf.get(), writer);
+    Comm::Write(conn, replyBuf.get(), writer);
 }
 
 /// called when we wrote the response header
@@ -79,7 +105,7 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
     debugs(16, 5, HERE);
     writer = NULL;
     Must(params.flag == COMM_OK);
-    Must(params.fd == fd);
+    Must(params.conn.getRaw() == conn.getRaw());
     Must(params.size != 0);
     // start inquiries at the initial pos
     inquire();
@@ -90,8 +116,8 @@ void
 Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
 {
     debugs(16, 5, HERE);
-    Must(fd < 0 || fd == params.fd);
-    fd = -1;
+    Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
+    conn = NULL;
     mustStop("commClosed");
 }
 
@@ -107,10 +133,10 @@ Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse)
 void
 Mgr::Inquirer::sendResponse()
 {
-    if (aggrAction->aggregatable()) {
+    if (!strands.empty() && aggrAction->aggregatable()) {
         removeCloseHandler();
-        AsyncJob::Start(new ActionWriter(aggrAction, fd));
-        fd = -1; // should not close fd because we passed it to ActionWriter
+        AsyncJob::Start(new ActionWriter(aggrAction, conn));
+        conn = NULL; // should not close because we passed it to ActionWriter
     }
 }
 
@@ -119,3 +145,44 @@ Mgr::Inquirer::doneAll() const
 {
     return !writer && Ipc::Inquirer::doneAll();
 }
+
+Ipc::StrandCoords
+Mgr::Inquirer::applyQueryParams(const Ipc::StrandCoords& aStrands, const QueryParams& aParams)
+{
+    Ipc::StrandCoords sc;
+
+    QueryParam::Pointer processesParam = aParams.get("processes");
+    QueryParam::Pointer workersParam = aParams.get("workers");
+
+    if (processesParam == NULL || workersParam == NULL) {
+        if (processesParam != NULL) {
+            IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
+            if (param != NULL && param->type == QueryParam::ptInt) {
+                const std::vector<int>& processes = param->value();
+                for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
+                        iter != aStrands.end(); ++iter) {
+                    if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
+                        sc.push_back(*iter);
+                }
+            }
+        } else if (workersParam != NULL) {
+            IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
+            if (param != NULL && param->type == QueryParam::ptInt) {
+                const std::vector<int>& workers = param->value();
+                for (int i = 0; i < (int)aStrands.size(); ++i) {
+                    if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
+                        sc.push_back(aStrands[i]);
+                }
+            }
+        } else {
+            sc = aStrands;
+        }
+    }
+
+    debugs(16, 4, HERE << "strands kid IDs = ");
+    for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
+        debugs(16, 4, HERE << iter->kidId);
+    }
+
+    return sc;
+}