]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Deduplicating IPC strand messages (#756)
authorEduard Bagdasaryan <eduard.bagdasaryan@measurement-factory.com>
Fri, 22 Jan 2021 17:20:30 +0000 (17:20 +0000)
committerSquid Anubis <squid-anubis@squid-cache.org>
Sun, 24 Jan 2021 22:07:39 +0000 (22:07 +0000)
No functionality changes other than minor debugging improvements.

* replaced identical (except for the message kind value) HereIamMessage
  and StrandSearchResponse classes with StrandMessage
* reduced code duplication with a new StrandMessage::NotifyCoordinator()
* split TypedMsgHdr::type() into unchecked rawType() and checked type()
* renamed and documented several Ipc::MessageType enum values

The above code improvements will help with adding more IPC messages.

15 files changed:
src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/ipc/Coordinator.cc
src/ipc/Coordinator.h
src/ipc/Messages.h
src/ipc/Port.cc
src/ipc/Strand.cc
src/ipc/Strand.h
src/ipc/StrandCoord.cc
src/ipc/StrandCoord.h
src/ipc/StrandSearch.cc
src/ipc/StrandSearch.h
src/ipc/TypedMsgHdr.cc
src/ipc/TypedMsgHdr.h
src/ipc/forward.h

index 3a65b47a784ca241ef2c923c2021fb173773b6f6..90c8d9cb35fa323b32a63ee03b5219c57378eeeb 100644 (file)
@@ -23,6 +23,7 @@
 #include "ipc/Messages.h"
 #include "ipc/Port.h"
 #include "ipc/Queue.h"
+#include "ipc/StrandCoord.h"
 #include "ipc/StrandSearch.h"
 #include "ipc/UdsOp.h"
 #include "sbuf/SBuf.h"
@@ -143,11 +144,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 
         queue->localRateLimit().store(config.ioRate);
 
-        Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, myPid));
-        ann.strand.tag = dbName;
-        Ipc::TypedMsgHdr message;
-        ann.pack(message);
-        SendMessage(Ipc::Port::CoordinatorAddr(), message);
+        Ipc::StrandMessage::NotifyCoordinator(Ipc::mtRegisterStrand, dbName.termedBuf());
 
         ioRequestor->ioCompletedNotification();
         return;
@@ -168,7 +165,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 }
 
 void
-IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
+IpcIoFile::openCompleted(const Ipc::StrandMessage *const response)
 {
     Must(diskId < 0); // we do not know our disker yet
 
@@ -455,7 +452,7 @@ IpcIoFile::canWait() const
 
 /// called when coordinator responds to worker open request
 void
-IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
+IpcIoFile::HandleOpenResponse(const Ipc::StrandMessage &response)
 {
     debugs(47, 7, HERE << "coordinator response to open request");
     for (IpcIoFileList::iterator i = WaitingForOpen.begin();
index a15227c9234b3c8ee56708c5408ea362b5732297..f3b724927a03ab7d9571ad9e5659753743b11ee9 100644 (file)
@@ -87,7 +87,7 @@ public:
     virtual bool ioInProgress() const;
 
     /// handle open response from coordinator
-    static void HandleOpenResponse(const Ipc::StrandSearchResponse &response);
+    static void HandleOpenResponse(const Ipc::StrandMessage &);
 
     /// handle queue push notifications from worker or disker
     static void HandleNotification(const Ipc::TypedMsgHdr &msg);
@@ -99,7 +99,7 @@ public:
 
 protected:
     friend class IpcIoPendingRequest;
-    void openCompleted(const Ipc::StrandSearchResponse *const response);
+    void openCompleted(const Ipc::StrandMessage *);
     void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response);
     void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
     bool canWait() const;
index 60ca2c537a35e707c90612423213fe7bd3fce577..012e6fb8e26d250bbc604ef18f4ef3e830b02ab4 100644 (file)
@@ -78,13 +78,13 @@ void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
 
 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
 {
-    switch (message.type()) {
-    case mtRegistration:
+    switch (message.rawType()) {
+    case mtRegisterStrand:
         debugs(54, 6, HERE << "Registration request");
-        handleRegistrationRequest(HereIamMessage(message));
+        handleRegistrationRequest(StrandMessage(message));
         break;
 
-    case mtStrandSearchRequest: {
+    case mtFindStrand: {
         const StrandSearchRequest sr(message);
         debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
                " tag: " << sr.tag);
@@ -128,18 +128,18 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message)
 #endif
 
     default:
-        debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
+        debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message with an unknown type: " << message.rawType());
         break;
     }
 }
 
-void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
+void Ipc::Coordinator::handleRegistrationRequest(const StrandMessage& msg)
 {
     registerStrand(msg.strand);
 
     // send back an acknowledgement; TODO: remove as not needed?
     TypedMsgHdr message;
-    msg.pack(message);
+    msg.pack(mtStrandRegistered, message);
     SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
 }
 
@@ -222,9 +222,9 @@ Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
 {
     debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
            request.tag << " is kid" << strand.kidId);
-    const StrandSearchResponse response(strand);
+    const StrandMessage response(strand);
     TypedMsgHdr message;
-    response.pack(message);
+    response.pack(mtStrandReady, message);
     SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
 }
 
index 2a00600bcbb8089962359f1c0e293c41794cb45e..f5ec5123d178f96356bbb6f7ee4df1d69b429dc9 100644 (file)
@@ -47,7 +47,7 @@ protected:
 
     StrandCoord* findStrand(int kidId); ///< registered strand or NULL
     void registerStrand(const StrandCoord &); ///< adds or updates existing
-    void handleRegistrationRequest(const HereIamMessage &); ///< register,ACK
+    void handleRegistrationRequest(const StrandMessage &); ///< register,ACK
 
     /// answer the waiting search request
     void notifySearcher(const StrandSearchRequest &request, const StrandCoord&);
index f1d9cfdbed6df718e18005458d0dd6f92c9eb75d..941d7fcd889a7feceea179fec1226c19af165e3e 100644 (file)
@@ -17,16 +17,30 @@ namespace Ipc
 {
 
 /// message class identifier
-typedef enum { mtNone = 0, mtRegistration,
-               mtStrandSearchRequest, mtStrandSearchResponse,
-               mtSharedListenRequest, mtSharedListenResponse,
+typedef enum { mtNone = 0, ///< unspecified or unknown message kind; unused on the wire
+
+               mtRegisterStrand, ///< notifies about our strand existence
+               mtStrandRegistered, ///< acknowledges mtRegisterStrand acceptance
+
+               mtFindStrand, ///< a worker requests a strand from Coordinator
+               mtStrandReady, ///< an mtFindStrand answer: the strand exists and should be usable
+
+               mtSharedListenRequest,
+               mtSharedListenResponse,
+
                mtIpcIoNotification,
+
                mtCollapsedForwardingNotification,
-               mtCacheMgrRequest, mtCacheMgrResponse
+
+               mtCacheMgrRequest,
+               mtCacheMgrResponse,
+
 #if SQUID_SNMP
-               ,
-               mtSnmpRequest, mtSnmpResponse
+               mtSnmpRequest,
+               mtSnmpResponse,
 #endif
+
+               mtEnd ///< for message kind range checks; unused on the wire
              } MessageType;
 
 } // namespace Ipc;
index 24714fdbb5b0f54cb1bbea8680935eb5ee2e1d5e..f3d1a3853f854ce8f6545107fa508edd2b228d49 100644 (file)
@@ -79,6 +79,7 @@ void Ipc::Port::noteRead(const CommIoCbParams& params)
            " [" << this << ']');
     if (params.flag == Comm::OK) {
         assert(params.buf == buf.raw());
+        debugs(54, 6, "message type: " << buf.rawType());
         receive(buf);
     }
     // TODO: if there was a fatal error on our socket, close the socket before
index ad417d47afe504c94b388c082e7585f3a3357510..6d844e9323feebd02d4dee4a6cce3bd0b67a4249 100644 (file)
@@ -53,20 +53,16 @@ void Ipc::Strand::registerSelf()
     debugs(54, 6, HERE);
     Must(!isRegistered);
 
-    HereIamMessage ann(StrandCoord(KidIdentifier, getpid()));
-    TypedMsgHdr message;
-    ann.pack(message);
-    SendMessage(Port::CoordinatorAddr(), message);
+    StrandMessage::NotifyCoordinator(mtRegisterStrand, nullptr);
     setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
 }
 
 void Ipc::Strand::receive(const TypedMsgHdr &message)
 {
-    debugs(54, 6, HERE << message.type());
-    switch (message.type()) {
+    switch (message.rawType()) {
 
-    case mtRegistration:
-        handleRegistrationResponse(HereIamMessage(message));
+    case mtStrandRegistered:
+        handleRegistrationResponse(StrandMessage(message));
         break;
 
     case mtSharedListenResponse:
@@ -74,8 +70,8 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
         break;
 
 #if HAVE_DISKIO_MODULE_IPCIO
-    case mtStrandSearchResponse:
-        IpcIoFile::HandleOpenResponse(StrandSearchResponse(message));
+    case mtStrandReady:
+        IpcIoFile::HandleOpenResponse(StrandMessage(message));
         break;
 
     case mtIpcIoNotification:
@@ -114,12 +110,13 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
 #endif
 
     default:
-        debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
+        debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message with an unknown type: " << message.rawType());
         break;
     }
 }
 
-void Ipc::Strand::handleRegistrationResponse(const HereIamMessage &msg)
+void
+Ipc::Strand::handleRegistrationResponse(const StrandMessage &msg)
 {
     // handle registration response from the coordinator; it could be stale
     if (msg.strand.kidId == KidIdentifier && msg.strand.pid == getpid()) {
index c6392494d1e179fa172eef3131ce48052e927fbe..9ac1273da0614809f105ff6b4cc4ff5aef6ab9a5 100644 (file)
@@ -39,7 +39,7 @@ protected:
 
 private:
     void registerSelf(); /// let Coordinator know this strand exists
-    void handleRegistrationResponse(const HereIamMessage &msg);
+    void handleRegistrationResponse(const StrandMessage &);
     void handleCacheMgrRequest(const Mgr::Request& request);
     void handleCacheMgrResponse(const Mgr::Response& response);
 #if SQUID_SNMP
index c65581f0ee9f353659b0bd0c21e3df0dcf99493e..fc35b0fb86c1300f0da7bb13d74a174505983613 100644 (file)
@@ -10,7 +10,8 @@
 
 #include "squid.h"
 #include "Debug.h"
-#include "ipc/Messages.h"
+#include "globals.h"
+#include "ipc/Port.h"
 #include "ipc/StrandCoord.h"
 #include "ipc/TypedMsgHdr.h"
 
@@ -37,20 +38,32 @@ void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const
     hdrMsg.putString(tag);
 }
 
-Ipc::HereIamMessage::HereIamMessage(const StrandCoord &aStrand):
+Ipc::StrandMessage::StrandMessage(const StrandCoord &aStrand):
     strand(aStrand)
 {
 }
 
-Ipc::HereIamMessage::HereIamMessage(const TypedMsgHdr &hdrMsg)
+Ipc::StrandMessage::StrandMessage(const TypedMsgHdr &hdrMsg)
 {
-    hdrMsg.checkType(mtRegistration);
     strand.unpack(hdrMsg);
 }
 
-void Ipc::HereIamMessage::pack(TypedMsgHdr &hdrMsg) const
+void
+Ipc::StrandMessage::pack(const MessageType messageType, TypedMsgHdr &hdrMsg) const
 {
-    hdrMsg.setType(mtRegistration);
+    hdrMsg.setType(messageType);
     strand.pack(hdrMsg);
 }
 
+void
+Ipc::StrandMessage::NotifyCoordinator(const MessageType msgType, const char *tag)
+{
+    static const auto pid = getpid();
+    StrandMessage message(StrandCoord(KidIdentifier, pid));
+    if (tag)
+        message.strand.tag = tag;
+    TypedMsgHdr hdr;
+    message.pack(msgType, hdr);
+    SendMessage(Port::CoordinatorAddr(), hdr);
+}
+
index d58e96e3efa33b40714d9eb680bbb7381619b9bb..2758d7777c4d24e7820938c4b141b17942e1043f 100644 (file)
@@ -10,6 +10,7 @@
 #define SQUID_IPC_STRAND_COORD_H
 
 #include "ipc/forward.h"
+#include "ipc/Messages.h"
 #include "SquidString.h"
 
 namespace Ipc
@@ -32,16 +33,19 @@ public:
     String tag; ///< optional unique well-known key (e.g., cache_dir path)
 };
 
-/// strand registration with Coordinator (also used as an ACK)
-class HereIamMessage
+/// an IPC message carrying StrandCoord
+class StrandMessage
 {
 public:
-    explicit HereIamMessage(const StrandCoord &strand); ///< from registrant
-    explicit HereIamMessage(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
-    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+    explicit StrandMessage(const StrandCoord &);
+    explicit StrandMessage(const TypedMsgHdr &);
+    void pack(MessageType, TypedMsgHdr &) const;
+
+    /// creates and sends StrandMessage to Coordinator
+    static void NotifyCoordinator(MessageType, const char *tag);
 
 public:
-    StrandCoord strand; ///< registrant coordinates and related details
+    StrandCoord strand; ///< messageType-specific coordinates (e.g., sender)
 };
 
 } // namespace Ipc;
index 30d8b1b1e4d4a9a73bb4c76aef9ababcdebb033f..13f9406ebbd032bdacd57a60d604a0d52ed49f53 100644 (file)
@@ -20,34 +20,15 @@ Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1)
 Ipc::StrandSearchRequest::StrandSearchRequest(const TypedMsgHdr &hdrMsg):
     requestorId(-1)
 {
-    hdrMsg.checkType(mtStrandSearchRequest);
+    hdrMsg.checkType(mtFindStrand);
     hdrMsg.getPod(requestorId);
     hdrMsg.getString(tag);
 }
 
 void Ipc::StrandSearchRequest::pack(TypedMsgHdr &hdrMsg) const
 {
-    hdrMsg.setType(mtStrandSearchRequest);
+    hdrMsg.setType(mtFindStrand);
     hdrMsg.putPod(requestorId);
     hdrMsg.putString(tag);
 }
 
-/* StrandSearchResponse */
-
-Ipc::StrandSearchResponse::StrandSearchResponse(const Ipc::StrandCoord &aStrand):
-    strand(aStrand)
-{
-}
-
-Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg)
-{
-    hdrMsg.checkType(mtStrandSearchResponse);
-    strand.unpack(hdrMsg);
-}
-
-void Ipc::StrandSearchResponse::pack(TypedMsgHdr &hdrMsg) const
-{
-    hdrMsg.setType(mtStrandSearchResponse);
-    strand.pack(hdrMsg);
-}
-
index ef76b60870199292994f17d335b788c490a47f77..3c3cd9d8cf8a44b26dd04f85fe1c27d3f9eadfeb 100644 (file)
@@ -29,18 +29,6 @@ public:
     String tag; ///< set when looking for a matching StrandCoord::tag
 };
 
-/// asynchronous strand search response
-class StrandSearchResponse
-{
-public:
-    StrandSearchResponse(const StrandCoord &strand);
-    explicit StrandSearchResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
-    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
-
-public:
-    StrandCoord strand; ///< answer matching StrandSearchRequest criteria
-};
-
 } // namespace Ipc;
 
 #endif /* SQUID_IPC_STRAND_SEARCH_H */
index fadaf75ba90fddc951ae4bc450c382db2746e1fe..c115944f9350cc668a8c1f442c943b59fbf3818b 100644 (file)
@@ -81,13 +81,6 @@ void Ipc::TypedMsgHdr::sync()
     offset = 0;
 }
 
-int
-Ipc::TypedMsgHdr::type() const
-{
-    Must(msg_iovlen == 1);
-    return data.type_;
-}
-
 void
 Ipc::TypedMsgHdr::address(const struct sockaddr_un& addr)
 {
@@ -100,7 +93,7 @@ Ipc::TypedMsgHdr::address(const struct sockaddr_un& addr)
 void
 Ipc::TypedMsgHdr::checkType(int destType) const
 {
-    Must(type() == destType);
+    Must(rawType() == destType);
 }
 
 void
index 37d471a69809496d80efe3cc32d2f0b68c4c5cbd..5fc7ac443fac9b1901f0111901bf60f41e2a08fe 100644 (file)
@@ -12,6 +12,7 @@
 #define SQUID_IPC_TYPED_MSG_HDR_H
 
 #include "compat/cmsg.h"
+#include "ipc/Messages.h"
 #if HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
 #endif
@@ -43,7 +44,9 @@ public:
     /* message type manipulation; these must be called before put/get*() */
     void setType(int aType); ///< sets message type; use MessageType enum
     void checkType(int aType) const; ///< throws if stored type is not aType
-    int type() const; ///< returns stored type or zero if none
+    /// received or set message kind; may not be a MessageType value
+    /// \returns 0 if no message kind has been received or set
+    int rawType() const { return msg_iov ? data.type_ : 0; }
 
     /* access for Plain Old Data (POD)-based message parts */
     template <class Pod>
index faeed7fe9b2baabe26ce66d6c0c189f0f127a0b8..8fff7804a122aa67733b5882e2ad3b22fd9a0d94 100644 (file)
@@ -16,8 +16,7 @@ namespace Ipc
 
 class TypedMsgHdr;
 class StrandCoord;
-class HereIamMessage;
-class StrandSearchResponse;
+class StrandMessage;
 class Forwarder;
 class Inquirer;
 class Request;