]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Support a "shared listen" concept when multiple concurrent processes listen
authorAlex Rousskov <rousskov@measurement-factory.com>
Sun, 2 May 2010 18:49:25 +0000 (12:49 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Sun, 2 May 2010 18:49:25 +0000 (12:49 -0600)
on the same socket. The Coordinator is responsible for opening and caching
listening sockets, using comm_open_listener() parameters supplied by remote
callers. Sendmsg/recvmsg is used to shovel socket descriptors from Coordinator
to remote callers.

If SMP is not enabled, we call the local comm_open_listener() as usual but
return the results asynchronously to avoid making SMP/nonSMP special in
the caller code.

src/ipc/Coordinator.cc
src/ipc/Coordinator.h
src/ipc/FdNotes.cc [new file with mode: 0644]
src/ipc/FdNotes.h [new file with mode: 0644]
src/ipc/Makefile.am
src/ipc/Messages.h
src/ipc/SharedListen.cc [new file with mode: 0644]
src/ipc/SharedListen.h [new file with mode: 0644]
src/ipc/StartListening.cc [new file with mode: 0644]
src/ipc/StartListening.h [new file with mode: 0644]
src/ipc/Strand.cc

index a232a83b5f3fec98dfbf5585964f7b55f3dac3b5..f1d414c80a8682f29a6abec29e3f5a7356e7c7e9 100644 (file)
@@ -7,7 +7,10 @@
 
 
 #include "config.h"
+#include "comm.h"
 #include "ipc/Coordinator.h"
+#include "ipc/FdNotes.h"
+#include "ipc/SharedListen.h"
 
 
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
@@ -50,6 +53,11 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message)
         handleRegistrationRequest(StrandCoord(message));
         break;
 
+    case mtSharedListenRequest:
+        debugs(54, 6, HERE << "Shared listen request");
+        handleSharedListenRequest(SharedListenRequest(message));
+        break;
+
     case mtDescriptorGet:
         debugs(54, 6, HERE << "Descriptor get request");
         handleDescriptorGet(Descriptor(message));
@@ -71,6 +79,50 @@ void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
     SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
 }
 
+void
+Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
+{
+    debugs(54, 4, HERE << "kid" << request.requestorId <<
+        " needs shared listen FD for " << request.params.addr);
+    Listeners::const_iterator i = listeners.find(request.params);
+    int errNo = 0;
+    const int sock = (i != listeners.end()) ?
+        i->second : openListenSocket(request, errNo);
+
+    debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
+        request.params.addr << " to kid" << request.requestorId <<
+        " mapId=" << request.mapId);
+
+    SharedListenResponse response(sock, errNo, request.mapId);
+    TypedMsgHdr message;
+    response.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+int
+Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
+        int &errNo)
+{
+    const OpenListenerParams &p = request.params;
+
+    debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
+        request.requestorId);
+
+    IpAddress addr = p.addr; // comm_open_listener may modify it
+
+    enter_suid();
+    const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
+        FdNote(p.fdNote));
+    errNo = (sock >= 0) ? 0 : errno;
+    leave_suid();
+
+    // cache positive results
+    if (sock >= 0)
+        listeners[request.params] = sock;
+
+    return sock;
+}
+
 void Ipc::Coordinator::handleDescriptorGet(const Descriptor& request)
 {
     // XXX: hack: create descriptor here
index 80e150a76824f6b10a08498fac3daae4efacb44a..6de739420268051bdd700617e8c1200104fdfcf7 100644 (file)
 
 
 #include "Array.h"
+#include <map>
 #include "ipc/Port.h"
 #include "ipc/Messages.h"
+#include "ipc/SharedListen.h"
 
 namespace Ipc
 {
@@ -35,11 +37,20 @@ protected:
     void registerStrand(const StrandCoord &); ///< adds or updates existing
     void handleRegistrationRequest(const StrandCoord &); ///< register,ACK
 
+    /// returns cached socket or calls openListenSocket()
+    void handleSharedListenRequest(const SharedListenRequest& request);
     void handleDescriptorGet(const Descriptor& request);
 
+    /// calls comm_open_listener()
+    int openListenSocket(const SharedListenRequest& request, int &errNo);
+
 private:
     typedef Vector<StrandCoord> Strands; ///< unsorted strands
     Strands strands; ///< registered processes and threads
+
+    typedef std::map<OpenListenerParams, int> Listeners; ///< params:fd map
+    Listeners listeners; ///< cached comm_open_listener() results
+
     static Coordinator* TheInstance; ///< the only class instance in existence
 
     CBDATA_CLASS2(Coordinator);
diff --git a/src/ipc/FdNotes.cc b/src/ipc/FdNotes.cc
new file mode 100644 (file)
index 0000000..1ab4cd3
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "Debug.h"
+#include "ipc/FdNotes.h"
+
+
+const char *
+Ipc::FdNote(int fdNoteId)
+{
+    static const char *FdNotes[Ipc::fdnEnd] = {
+        "None", // fdnNone
+        "HTTP Socket" // fdnHttpSocket
+    };
+
+    if (fdnNone < fdNoteId && fdNoteId < fdnEnd)
+        return FdNotes[fdNoteId];
+
+    debugs(54, 1, HERE << "salvaged bug: wrong fd_note ID: " << fdNoteId);
+    return FdNotes[fdnNone];
+}
diff --git a/src/ipc/FdNotes.h b/src/ipc/FdNotes.h
new file mode 100644 (file)
index 0000000..8fa5cb7
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_FD_NOTES_H
+#define SQUID_IPC_FD_NOTES_H
+
+namespace Ipc
+{
+
+/// We cannot send char* FD notes to other processes. Pass int IDs and convert.
+
+typedef enum { fdnNone, fdnHttpSocket, fdnEnd } FdNoteId; ///< fd_note() label
+
+extern const char *FdNote(int fdNodeId); ///< converts FdNoteId into a string
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_FD_NOTES_H */
index 3c901415c9f1dcd54f612cfb83aeac7a31a44f89..c81662c48298f22cd163e491384020d6612a0802 100644 (file)
@@ -4,12 +4,18 @@ include $(top_srcdir)/src/TestHeaders.am
 noinst_LTLIBRARIES = libipc.la
 
 libipc_la_SOURCES = \
+       FdNotes.cc \
+       FdNotes.h \
        Kid.cc \
        Kid.h \
        Kids.cc \
        Kids.h \
        Messages.cc \
        Messages.h \
+       StartListening.cc \
+       StartListening.h \
+       SharedListen.cc \
+       SharedListen.h \
        TypedMsgHdr.cc \
        TypedMsgHdr.h \
        Coordinator.cc \
index d545b21b74c5b49b76817654ea9a8065e146f292..31cb6cb1658aa9534d15ce5fafb7538f42eb2b4b 100644 (file)
@@ -20,6 +20,7 @@ namespace Ipc
 class TypedMsgHdr;
 
 typedef enum { mtNone = 0, mtRegistration,
+    mtSharedListenRequest, mtSharedListenResponse,
     mtDescriptorGet, mtDescriptorPut } MessageType;
 
 /// Strand location details
diff --git a/src/ipc/SharedListen.cc b/src/ipc/SharedListen.cc
new file mode 100644 (file)
index 0000000..9e46fcb
--- /dev/null
@@ -0,0 +1,149 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include <map>
+#include "comm.h"
+#include "ipc/Port.h"
+#include "ipc/Messages.h"
+#include "ipc/Kids.h"
+#include "ipc/TypedMsgHdr.h"
+#include "ipc/StartListening.h"
+#include "ipc/SharedListen.h"
+
+
+/// holds information necessary to handle JoinListen response
+class PendingOpenRequest
+{
+public:
+    Ipc::OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
+    AsyncCall::Pointer callback; // who to notify
+};
+
+/// maps ID assigned at request time to the response callback
+typedef std::map<int, PendingOpenRequest> SharedListenRequestMap;
+static SharedListenRequestMap TheSharedListenRequestMap;
+
+static int
+AddToMap(const PendingOpenRequest &por)
+{
+    // find unused ID using linear seach; there should not be many entries
+    for (int id = 0; true; ++id) {
+        if (TheSharedListenRequestMap.find(id) == TheSharedListenRequestMap.end()) {
+            TheSharedListenRequestMap[id] = por;
+            return id;
+        }
+    }
+    assert(false); // not reached
+    return -1;
+}
+
+Ipc::OpenListenerParams::OpenListenerParams()
+{
+    xmemset(this, 0, sizeof(*this));
+}
+
+bool
+Ipc::OpenListenerParams::operator <(const OpenListenerParams &p) const
+{
+    if (sock_type != p.sock_type)
+        return sock_type < p.sock_type;
+
+    if (proto != p.proto)
+        return proto < p.proto;
+
+    // ignore flags and fdNote differences because they do not affect binding
+
+    return addr.compareWhole(p.addr) < 0;
+}
+
+
+
+Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
+{
+    // caller will then set public data members
+}
+
+Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr &hdrMsg)
+{
+    hdrMsg.getData(mtSharedListenRequest, this, sizeof(*this));
+}
+
+void Ipc::SharedListenRequest::pack(TypedMsgHdr &hdrMsg) const
+{
+    hdrMsg.putData(mtSharedListenRequest, this, sizeof(*this));
+}
+
+
+Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId):
+    fd(aFd), errNo(anErrNo), mapId(aMapId)
+{
+}
+
+Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
+    fd(-1), errNo(0), mapId(-1)
+{
+    hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this));
+    fd = hdrMsg.getFd();
+}
+
+void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
+{
+    hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this));
+    hdrMsg.putFd(fd);
+}
+
+
+void Ipc::JoinSharedListen(const OpenListenerParams &params,
+    AsyncCall::Pointer &callback)
+{
+    PendingOpenRequest por;
+    por.params = params;
+    por.callback = callback;
+
+    SharedListenRequest request;
+    request.requestorId = KidIdentifier;
+    request.params = por.params;
+    request.mapId = AddToMap(por);
+
+    debugs(54, 3, HERE << "getting listening FD for " << request.params.addr <<
+        " mapId=" << request.mapId);
+
+    TypedMsgHdr message;
+    request.pack(message);
+    SendMessage(coordinatorAddr, message);
+}
+
+void Ipc::SharedListenJoined(const SharedListenResponse &response)
+{
+    const int fd = response.fd;
+
+    debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" <<
+        response.errNo << " mapId=" << response.mapId);
+
+    Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
+    PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
+    Must(por.callback != NULL);
+    TheSharedListenRequestMap.erase(response.mapId);
+
+    if (fd >= 0) {
+        OpenListenerParams &p = por.params;
+        struct addrinfo *AI = NULL;
+        p.addr.GetAddrInfo(AI);
+        AI->ai_socktype = p.sock_type;
+        AI->ai_protocol = p.proto;
+        comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI);
+        p.addr.FreeAddrInfo(AI);
+    }
+
+    StartListeningCb *cbd =
+        dynamic_cast<StartListeningCb*>(por.callback->getDialer());
+    Must(cbd);
+    cbd->fd = fd;
+    cbd->errNo = response.errNo;
+    ScheduleCallHere(por.callback);
+}
diff --git a/src/ipc/SharedListen.h b/src/ipc/SharedListen.h
new file mode 100644 (file)
index 0000000..3b77b88
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_SHARED_LISTEN_H
+#define SQUID_IPC_SHARED_LISTEN_H
+
+#include "base/AsyncCall.h"
+
+namespace Ipc
+{
+
+/// "shared listen" is when concurrent processes are listening on the same fd
+
+/// comm_open_listener() parameters holder
+class OpenListenerParams
+{
+public:
+    OpenListenerParams();
+
+    bool operator <(const OpenListenerParams &p) const; ///< useful for map<>
+
+    int sock_type;
+    int proto;
+    IpAddress addr; ///< will be memset and memcopied
+    int flags;
+    int fdNote; ///< index into fd_note() comment strings
+};
+
+class TypedMsgHdr;
+
+/// a request for a listen socket with given parameters
+class SharedListenRequest
+{
+public:
+    SharedListenRequest(); ///< from OpenSharedListen() which then sets public data
+    explicit SharedListenRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+    int requestorId; ///< kidId of the requestor
+
+    OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
+
+    int mapId; ///< to map future response to the requestor's callback
+};
+
+/// a response to SharedListenRequest
+class SharedListenResponse
+{
+public:
+    SharedListenResponse(int fd, int errNo, int mapId);
+    explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+    int fd; ///< opened listening socket or -1
+    int errNo; ///< errno value from comm_open_sharedListen() call
+    int mapId; ///< to map future response to the requestor's callback
+};
+
+/// prepare and send SharedListenRequest to Coordinator
+extern void JoinSharedListen(const OpenListenerParams &, AsyncCall::Pointer &);
+
+/// process Coordinator response to SharedListenRequest
+extern void SharedListenJoined(const SharedListenResponse &response);
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_SHARED_LISTEN_H */
diff --git a/src/ipc/StartListening.cc b/src/ipc/StartListening.cc
new file mode 100644 (file)
index 0000000..764be9e
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "comm.h"
+#include "TextException.h"
+#include "ipc/SharedListen.h"
+#include "ipc/StartListening.h"
+
+
+Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0)
+{
+}
+
+Ipc::StartListeningCb::~StartListeningCb()
+{
+}
+
+std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
+{
+    return os << "(FD " << fd << ", err=" << errNo;
+}
+
+
+void Ipc::StartListening(int sock_type, int proto, IpAddress &addr,
+    int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+{
+    OpenListenerParams p;
+    p.sock_type = sock_type;
+    p.proto = proto;
+    p.addr = addr;
+    p.flags = flags;
+    p.fdNote = fdNote;
+
+    if (!opt_no_daemon && Config.main_processes > 1) { // if SMP is on, share
+        Ipc::JoinSharedListen(p, callback);
+        return; // wait for the call back
+    }
+
+    enter_suid();
+    const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
+        FdNote(p.fdNote));
+    const int errNo = (sock >= 0) ? 0 : errno;
+    leave_suid();
+
+    debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
+
+    StartListeningCb *cbd =
+        dynamic_cast<StartListeningCb*>(callback->getDialer());
+    Must(cbd);
+    cbd->fd = sock;
+    cbd->errNo = errNo;
+    ScheduleCallHere(callback);
+}
diff --git a/src/ipc/StartListening.h b/src/ipc/StartListening.h
new file mode 100644 (file)
index 0000000..9bf8e44
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_START_LISTENING_H
+#define SQUID_IPC_START_LISTENING_H
+
+#include <iosfwd>
+#include "ipc/FdNotes.h"
+#include "base/AsyncCall.h"
+
+class IpAddress;
+
+namespace Ipc
+{
+
+/// common API for all StartListening() callbacks
+class StartListeningCb
+{
+public:
+    StartListeningCb();
+    virtual ~StartListeningCb();
+
+    /// starts printing arguments, return os
+    std::ostream &startPrint(std::ostream &os) const;
+
+public:
+    int fd; ///< opened listening socket or -1
+    int errNo; ///< errno value from the comm_open_listener() call
+};
+
+/// Depending on whether SMP is on, either ask Coordinator to send us
+/// the listening FD or call comm_open_listener() directly.
+extern void StartListening(int sock_type, int proto, IpAddress &addr,
+    int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_START_LISTENING_H */
index 3bf75c7dfaf602cd9d56bc1f904e528859e7596e..c8f2c4119cc322fe16741145cb369e02b606f1cf 100644 (file)
@@ -8,6 +8,7 @@
 #include "config.h"
 #include "ipc/Strand.h"
 #include "ipc/Messages.h"
+#include "ipc/SharedListen.h"
 #include "ipc/Kids.h"
 
 
@@ -45,6 +46,10 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
         handleRegistrationResponse(StrandCoord(message));
         break;
 
+    case mtSharedListenResponse:
+        SharedListenJoined(SharedListenResponse(message));
+        break;
+
     case mtDescriptorPut:
         putDescriptor(Descriptor(message));
         break;