]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/ipc/Coordinator.cc
Renamed squid.h to squid-old.h and config.h to squid.h
[thirdparty/squid.git] / src / ipc / Coordinator.cc
index afdec48cadd9c0a51508298d4d5140a2b85f5aa5..6f1a10a31ff374071b4ce08cc7aad9cdd5bd81ed 100644 (file)
@@ -6,19 +6,22 @@
  */
 
 
-#include "config.h"
+#include "squid.h"
+#include "base/Subscription.h"
 #include "base/TextException.h"
 #include "CacheManager.h"
 #include "comm.h"
+#include "comm/Connection.h"
 #include "ipc/Coordinator.h"
-#include "ipc/FdNotes.h"
 #include "ipc/SharedListen.h"
 #include "mgr/Inquirer.h"
 #include "mgr/Request.h"
 #include "mgr/Response.h"
-#include "mgr/StoreToCommWriter.h"
-#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */
-
+#if SQUID_SNMP
+#include "snmp/Inquirer.h"
+#include "snmp/Request.h"
+#include "snmp/Response.h"
+#endif
 
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
@@ -63,9 +66,9 @@ void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
         if (i->tag == strand.tag) {
             notifySearcher(*i, strand);
             i = searchers.erase(i);
-               } else {
+        } else {
             ++i;
-               }
+        }
     }
 }
 
@@ -77,31 +80,48 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message)
         handleRegistrationRequest(HereIamMessage(message));
         break;
 
-    case mtIpcIoRequest: { // XXX: this should have been mtStrandSearchRequest
-        IpcIoRequest io(message);
-        StrandSearchRequest sr;
-        sr.requestorId = io.requestorId;
-        sr.requestId = io.requestId;
-        sr.tag.limitInit(io.buf, io.len);
-        debugs(54, 6, HERE << "Strand search request: " << io.requestorId << ' ' << io.requestId << ' ' << io.len << " cmd=" << io.command << " tag: " << sr.tag);
+    case mtStrandSearchRequest: {
+        const StrandSearchRequest sr(message);
+        debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
+               " tag: " << sr.tag);
         handleSearchRequest(sr);
         break;
-       }
+    }
 
     case mtSharedListenRequest:
         debugs(54, 6, HERE << "Shared listen request");
         handleSharedListenRequest(SharedListenRequest(message));
         break;
 
-    case mtCacheMgrRequest:
+    case mtCacheMgrRequest: {
         debugs(54, 6, HERE << "Cache manager request");
-        handleCacheMgrRequest(Mgr::Request(message));
-        break;
+        const Mgr::Request req(message);
+        handleCacheMgrRequest(req);
+    }
+    break;
 
-    case mtCacheMgrResponse:
+    case mtCacheMgrResponse: {
         debugs(54, 6, HERE << "Cache manager response");
-        handleCacheMgrResponse(Mgr::Response(message));
-        break;
+        const Mgr::Response resp(message);
+        handleCacheMgrResponse(resp);
+    }
+    break;
+
+#if SQUID_SNMP
+    case mtSnmpRequest: {
+        debugs(54, 6, HERE << "SNMP request");
+        const Snmp::Request req(message);
+        handleSnmpRequest(req);
+    }
+    break;
+
+    case mtSnmpResponse: {
+        debugs(54, 6, HERE << "SNMP response");
+        const Snmp::Response resp(message);
+        handleSnmpResponse(resp);
+    }
+    break;
+#endif
 
     default:
         debugs(54, 1, HERE << "Unhandled message type: " << message.type());
@@ -126,14 +146,14 @@ Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
            " needs shared listen FD for " << request.params.addr);
     Listeners::const_iterator i = listeners.find(request.params);
     int errNo = 0;
-    const int sock = (i != listeners.end()) ?
-                     i->second : openListenSocket(request, errNo);
+    const Comm::ConnectionPointer c = (i != listeners.end()) ?
+                                      i->second : openListenSocket(request, errNo);
 
-    debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
+    debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
            request.params.addr << " to kid" << request.requestorId <<
            " mapId=" << request.mapId);
 
-    SharedListenResponse response(sock, errNo, request.mapId);
+    SharedListenResponse response(c->fd, errNo, request.mapId);
     TypedMsgHdr message;
     response.pack(message);
     SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
@@ -144,16 +164,25 @@ Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
 {
     debugs(54, 4, HERE);
 
+    try {
+        Mgr::Action::Pointer action =
+            CacheManager::GetInstance()->createRequestedAction(request.params);
+        AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
+    } catch (const std::exception &ex) {
+        debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
+               request.params.actionName << ": " << ex.what());
+        // TODO: Avoid half-baked Connections or teach them how to close.
+        ::close(request.conn->fd);
+        request.conn->fd = -1;
+        return; // the worker will timeout and close
+    }
+
     // Let the strand know that we are now responsible for handling the request
     Mgr::Response response(request.requestId);
     TypedMsgHdr message;
     response.pack(message);
     SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
 
-    Mgr::Action::Pointer action =
-        CacheManager::GetInstance()->createRequestedAction(request.params);
-    AsyncJob::Start(new Mgr::Inquirer(action,
-                                      Mgr::ImportHttpFdIntoComm(request.fd), request, strands_));
 }
 
 void
@@ -176,11 +205,11 @@ Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
     if (strand) {
         notifySearcher(request, *strand);
         return;
-       }
+    }
 
     searchers.push_back(request);
     debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
-        " who " << request.tag << " is");
+           " who " << request.tag << " is");
 }
 
 void
@@ -188,20 +217,36 @@ Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
                                  const StrandCoord& strand)
 {
     debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
-        request.tag << " is kid" << strand.kidId);
-    const StrandSearchResponse response0(request.requestId, strand);
-    // XXX: we should use StrandSearchResponse instead of converting it
-    IpcIoResponse io;
-    io.diskId = strand.kidId;
-    io.requestId = request.requestId;
-    io.command = IpcIo::cmdOpen;
+           request.tag << " is kid" << strand.kidId);
+    const StrandSearchResponse response(strand);
+    TypedMsgHdr message;
+    response.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+#if SQUID_SNMP
+void
+Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
+{
+    debugs(54, 4, HERE);
+
+    Snmp::Response response(request.requestId);
     TypedMsgHdr message;
-    io.pack(message);
+    response.pack(message);
     SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+
+    AsyncJob::Start(new Snmp::Inquirer(request, strands_));
 }
 
+void
+Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
+{
+    debugs(54, 4, HERE);
+    Snmp::Inquirer::HandleRemoteAck(response);
+}
+#endif
 
-int
+Comm::ConnectionPointer
 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
                                    int &errNo)
 {
@@ -210,19 +255,23 @@ Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
     debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
            request.requestorId);
 
-    Ip::Address addr = p.addr; // comm_open_listener may modify it
+    Comm::ConnectionPointer conn = new Comm::Connection;
+    conn->local = p.addr; // comm_open_listener may modify it
+    conn->flags = p.flags;
 
     enter_suid();
-    const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
-                                        FdNote(p.fdNote));
-    errNo = (sock >= 0) ? 0 : errno;
+    comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
+    errNo = Comm::IsConnOpen(conn) ? 0 : errno;
     leave_suid();
 
+    debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
+           request.requestorId);
+
     // cache positive results
-    if (sock >= 0)
-        listeners[request.params] = sock;
+    if (Comm::IsConnOpen(conn))
+        listeners[request.params] = conn;
 
-    return sock;
+    return conn;
 }
 
 void Ipc::Coordinator::broadcastSignal(int sig) const