]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/ipc/Coordinator.cc
Renamed squid.h to squid-old.h and config.h to squid.h
[thirdparty/squid.git] / src / ipc / Coordinator.cc
index 3a11401fce7410b42391ae28541f8a76bd952970..6f1a10a31ff374071b4ce08cc7aad9cdd5bd81ed 100644 (file)
  */
 
 
-#include "config.h"
+#include "squid.h"
+#include "base/Subscription.h"
+#include "base/TextException.h"
+#include "CacheManager.h"
+#include "comm.h"
+#include "comm/Connection.h"
 #include "ipc/Coordinator.h"
-
+#include "ipc/SharedListen.h"
+#include "mgr/Inquirer.h"
+#include "mgr/Request.h"
+#include "mgr/Response.h"
+#if SQUID_SNMP
+#include "snmp/Inquirer.h"
+#include "snmp/Request.h"
+#include "snmp/Response.h"
+#endif
 
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
+Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
 
 
 Ipc::Coordinator::Coordinator():
-    Port(coordinatorPathAddr)
+        Port(coordinatorAddr)
 {
 }
 
 void Ipc::Coordinator::start()
 {
-    listen();
+    Port::start();
 }
 
-Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId)
+Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
 {
-    for (Vector<StrandData>::iterator iter = strands.begin(); iter != strands.end(); ++iter) {
+    typedef StrandCoords::iterator SI;
+    for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
         if (iter->kidId == kidId)
             return &(*iter);
     }
     return NULL;
 }
 
-void Ipc::Coordinator::enrollStrand(const StrandData& strand)
+void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
 {
-    if (StrandData* found = findStrand(strand.kidId))
+    debugs(54, 3, HERE << "registering kid" << strand.kidId <<
+           ' ' << strand.tag);
+    if (StrandCoord* found = findStrand(strand.kidId)) {
+        const String oldTag = found->tag;
         *found = strand;
-    else
-        strands.push_back(strand);
+        if (oldTag.size() && !strand.tag.size())
+            found->tag = oldTag; // keep more detailed info (XXX?)
+    } else {
+        strands_.push_back(strand);
+    }
+
+    // notify searchers waiting for this new strand, if any
+    typedef Searchers::iterator SRI;
+    for (SRI i = searchers.begin(); i != searchers.end();) {
+        if (i->tag == strand.tag) {
+            notifySearcher(*i, strand);
+            i = searchers.erase(i);
+        } else {
+            ++i;
+        }
+    }
 }
 
-void Ipc::Coordinator::handleRead(const Message& message)
+void Ipc::Coordinator::receive(const TypedMsgHdr& message)
 {
     switch (message.type()) {
-    case mtRegister:
+    case mtRegistration:
         debugs(54, 6, HERE << "Registration request");
-        handleRegistrationRequest(message.strand());
+        handleRegistrationRequest(HereIamMessage(message));
+        break;
+
+    case mtStrandSearchRequest: {
+        const StrandSearchRequest sr(message);
+        debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
+               " tag: " << sr.tag);
+        handleSearchRequest(sr);
+        break;
+    }
+
+    case mtSharedListenRequest:
+        debugs(54, 6, HERE << "Shared listen request");
+        handleSharedListenRequest(SharedListenRequest(message));
         break;
 
+    case mtCacheMgrRequest: {
+        debugs(54, 6, HERE << "Cache manager request");
+        const Mgr::Request req(message);
+        handleCacheMgrRequest(req);
+    }
+    break;
+
+    case mtCacheMgrResponse: {
+        debugs(54, 6, HERE << "Cache manager response");
+        const Mgr::Response resp(message);
+        handleCacheMgrResponse(resp);
+    }
+    break;
+
+#if SQUID_SNMP
+    case mtSnmpRequest: {
+        debugs(54, 6, HERE << "SNMP request");
+        const Snmp::Request req(message);
+        handleSnmpRequest(req);
+    }
+    break;
+
+    case mtSnmpResponse: {
+        debugs(54, 6, HERE << "SNMP response");
+        const Snmp::Response resp(message);
+        handleSnmpResponse(resp);
+    }
+    break;
+#endif
+
     default:
-        debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
+        debugs(54, 1, HERE << "Unhandled message type: " << message.type());
         break;
     }
 }
 
-void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand)
+void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
+{
+    registerStrand(msg.strand);
+
+    // send back an acknowledgement; TODO: remove as not needed?
+    TypedMsgHdr message;
+    msg.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
+}
+
+void
+Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
+{
+    debugs(54, 4, HERE << "kid" << request.requestorId <<
+           " needs shared listen FD for " << request.params.addr);
+    Listeners::const_iterator i = listeners.find(request.params);
+    int errNo = 0;
+    const Comm::ConnectionPointer c = (i != listeners.end()) ?
+                                      i->second : openListenSocket(request, errNo);
+
+    debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
+           request.params.addr << " to kid" << request.requestorId <<
+           " mapId=" << request.mapId);
+
+    SharedListenResponse response(c->fd, errNo, request.mapId);
+    TypedMsgHdr message;
+    response.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+void
+Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
+{
+    debugs(54, 4, HERE);
+
+    try {
+        Mgr::Action::Pointer action =
+            CacheManager::GetInstance()->createRequestedAction(request.params);
+        AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
+    } catch (const std::exception &ex) {
+        debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
+               request.params.actionName << ": " << ex.what());
+        // TODO: Avoid half-baked Connections or teach them how to close.
+        ::close(request.conn->fd);
+        request.conn->fd = -1;
+        return; // the worker will timeout and close
+    }
+
+    // Let the strand know that we are now responsible for handling the request
+    Mgr::Response response(request.requestId);
+    TypedMsgHdr message;
+    response.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+
+}
+
+void
+Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
+{
+    Mgr::Inquirer::HandleRemoteAck(response);
+}
+
+void
+Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
+{
+    // do we know of a strand with the given search tag?
+    const StrandCoord *strand = NULL;
+    typedef StrandCoords::const_iterator SCCI;
+    for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
+        if (i->tag == request.tag)
+            strand = &(*i);
+    }
+
+    if (strand) {
+        notifySearcher(request, *strand);
+        return;
+    }
+
+    searchers.push_back(request);
+    debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
+           " who " << request.tag << " is");
+}
+
+void
+Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
+                                 const StrandCoord& strand)
+{
+    debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
+           request.tag << " is kid" << strand.kidId);
+    const StrandSearchResponse response(strand);
+    TypedMsgHdr message;
+    response.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+#if SQUID_SNMP
+void
+Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
+{
+    debugs(54, 4, HERE);
+
+    Snmp::Response response(request.requestId);
+    TypedMsgHdr message;
+    response.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+
+    AsyncJob::Start(new Snmp::Inquirer(request, strands_));
+}
+
+void
+Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
+{
+    debugs(54, 4, HERE);
+    Snmp::Inquirer::HandleRemoteAck(response);
+}
+#endif
+
+Comm::ConnectionPointer
+Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
+                                   int &errNo)
+{
+    const OpenListenerParams &p = request.params;
+
+    debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
+           request.requestorId);
+
+    Comm::ConnectionPointer conn = new Comm::Connection;
+    conn->local = p.addr; // comm_open_listener may modify it
+    conn->flags = p.flags;
+
+    enter_suid();
+    comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
+    errNo = Comm::IsConnOpen(conn) ? 0 : errno;
+    leave_suid();
+
+    debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
+           request.requestorId);
+
+    // cache positive results
+    if (Comm::IsConnOpen(conn))
+        listeners[request.params] = conn;
+
+    return conn;
+}
+
+void Ipc::Coordinator::broadcastSignal(int sig) const
+{
+    typedef StrandCoords::const_iterator SCI;
+    for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
+        debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
+               ", PID=" << iter->pid);
+        kill(iter->pid, sig);
+    }
+}
+
+Ipc::Coordinator* Ipc::Coordinator::Instance()
+{
+    if (!TheInstance)
+        TheInstance = new Coordinator;
+    // XXX: if the Coordinator job quits, this pointer will become invalid
+    // we could make Coordinator death fatal, except during exit, but since
+    // Strands do not re-register, even process death would be pointless.
+    return TheInstance;
+}
+
+const Ipc::StrandCoords&
+Ipc::Coordinator::strands() const
 {
-    // register strand
-    enrollStrand(strand);
-    // send back received message
-    SendMessage(makeAddr(strandPathAddr, strand.kidId), Message(mtRegister, strand.kidId, strand.pid));
+    return strands_;
 }