From: Eduard Bagdasaryan Date: Fri, 22 Jan 2021 17:20:30 +0000 (+0000) Subject: Deduplicating IPC strand messages (#756) X-Git-Tag: 4.15-20210522-snapshot~37 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6ccfd70a3b54bf2c7c7db749d58e17e49602fd9b;p=thirdparty%2Fsquid.git Deduplicating IPC strand messages (#756) No functionality changes other than minor debugging improvements. * replaced identical (except for the message kind value) HereIamMessage and StrandSearchResponse classes with StrandMessage * reduced code duplication with a new StrandMessage::NotifyCoordinator() * split TypedMsgHdr::type() into unchecked rawType() and checked type() * renamed and documented several Ipc::MessageType enum values The above code improvements will help with adding more IPC messages. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 3a65b47a78..90c8d9cb35 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -23,6 +23,7 @@ #include "ipc/Messages.h" #include "ipc/Port.h" #include "ipc/Queue.h" +#include "ipc/StrandCoord.h" #include "ipc/StrandSearch.h" #include "ipc/UdsOp.h" #include "sbuf/SBuf.h" @@ -143,11 +144,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) queue->localRateLimit().store(config.ioRate); - Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, myPid)); - ann.strand.tag = dbName; - Ipc::TypedMsgHdr message; - ann.pack(message); - SendMessage(Ipc::Port::CoordinatorAddr(), message); + Ipc::StrandMessage::NotifyCoordinator(Ipc::mtRegisterStrand, dbName.termedBuf()); ioRequestor->ioCompletedNotification(); return; @@ -168,7 +165,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) } void -IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) +IpcIoFile::openCompleted(const Ipc::StrandMessage *const response) { Must(diskId < 0); // we do not know our disker yet @@ -455,7 +452,7 @@ IpcIoFile::canWait() const /// called when coordinator responds to worker open request void -IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) +IpcIoFile::HandleOpenResponse(const Ipc::StrandMessage &response) { debugs(47, 7, HERE << "coordinator response to open request"); for (IpcIoFileList::iterator i = WaitingForOpen.begin(); diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index a15227c923..f3b724927a 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -87,7 +87,7 @@ public: virtual bool ioInProgress() const; /// handle open response from coordinator - static void HandleOpenResponse(const Ipc::StrandSearchResponse &response); + static void HandleOpenResponse(const Ipc::StrandMessage &); /// handle queue push notifications from worker or disker static void HandleNotification(const Ipc::TypedMsgHdr &msg); @@ -99,7 +99,7 @@ public: protected: friend class IpcIoPendingRequest; - void openCompleted(const Ipc::StrandSearchResponse *const response); + void openCompleted(const Ipc::StrandMessage *); void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response); void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response); bool canWait() const; diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc index 60ca2c537a..012e6fb8e2 100644 --- a/src/ipc/Coordinator.cc +++ b/src/ipc/Coordinator.cc @@ -78,13 +78,13 @@ void Ipc::Coordinator::registerStrand(const StrandCoord& strand) void Ipc::Coordinator::receive(const TypedMsgHdr& message) { - switch (message.type()) { - case mtRegistration: + switch (message.rawType()) { + case mtRegisterStrand: debugs(54, 6, HERE << "Registration request"); - handleRegistrationRequest(HereIamMessage(message)); + handleRegistrationRequest(StrandMessage(message)); break; - case mtStrandSearchRequest: { + case mtFindStrand: { const StrandSearchRequest sr(message); debugs(54, 6, HERE << "Strand search request: " << sr.requestorId << " tag: " << sr.tag); @@ -128,18 +128,18 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message) #endif default: - debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type()); + debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message with an unknown type: " << message.rawType()); break; } } -void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg) +void Ipc::Coordinator::handleRegistrationRequest(const StrandMessage& msg) { registerStrand(msg.strand); // send back an acknowledgement; TODO: remove as not needed? TypedMsgHdr message; - msg.pack(message); + msg.pack(mtStrandRegistered, message); SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message); } @@ -222,9 +222,9 @@ Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request, { debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " << request.tag << " is kid" << strand.kidId); - const StrandSearchResponse response(strand); + const StrandMessage response(strand); TypedMsgHdr message; - response.pack(message); + response.pack(mtStrandReady, message); SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message); } diff --git a/src/ipc/Coordinator.h b/src/ipc/Coordinator.h index 2a00600bcb..f5ec5123d1 100644 --- a/src/ipc/Coordinator.h +++ b/src/ipc/Coordinator.h @@ -47,7 +47,7 @@ protected: StrandCoord* findStrand(int kidId); ///< registered strand or NULL void registerStrand(const StrandCoord &); ///< adds or updates existing - void handleRegistrationRequest(const HereIamMessage &); ///< register,ACK + void handleRegistrationRequest(const StrandMessage &); ///< register,ACK /// answer the waiting search request void notifySearcher(const StrandSearchRequest &request, const StrandCoord&); diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h index f1d9cfdbed..941d7fcd88 100644 --- a/src/ipc/Messages.h +++ b/src/ipc/Messages.h @@ -17,16 +17,30 @@ namespace Ipc { /// message class identifier -typedef enum { mtNone = 0, mtRegistration, - mtStrandSearchRequest, mtStrandSearchResponse, - mtSharedListenRequest, mtSharedListenResponse, +typedef enum { mtNone = 0, ///< unspecified or unknown message kind; unused on the wire + + mtRegisterStrand, ///< notifies about our strand existence + mtStrandRegistered, ///< acknowledges mtRegisterStrand acceptance + + mtFindStrand, ///< a worker requests a strand from Coordinator + mtStrandReady, ///< an mtFindStrand answer: the strand exists and should be usable + + mtSharedListenRequest, + mtSharedListenResponse, + mtIpcIoNotification, + mtCollapsedForwardingNotification, - mtCacheMgrRequest, mtCacheMgrResponse + + mtCacheMgrRequest, + mtCacheMgrResponse, + #if SQUID_SNMP - , - mtSnmpRequest, mtSnmpResponse + mtSnmpRequest, + mtSnmpResponse, #endif + + mtEnd ///< for message kind range checks; unused on the wire } MessageType; } // namespace Ipc; diff --git a/src/ipc/Port.cc b/src/ipc/Port.cc index 24714fdbb5..f3d1a3853f 100644 --- a/src/ipc/Port.cc +++ b/src/ipc/Port.cc @@ -79,6 +79,7 @@ void Ipc::Port::noteRead(const CommIoCbParams& params) " [" << this << ']'); if (params.flag == Comm::OK) { assert(params.buf == buf.raw()); + debugs(54, 6, "message type: " << buf.rawType()); receive(buf); } // TODO: if there was a fatal error on our socket, close the socket before diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index ad417d47af..6d844e9323 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -53,20 +53,16 @@ void Ipc::Strand::registerSelf() debugs(54, 6, HERE); Must(!isRegistered); - HereIamMessage ann(StrandCoord(KidIdentifier, getpid())); - TypedMsgHdr message; - ann.pack(message); - SendMessage(Port::CoordinatorAddr(), message); + StrandMessage::NotifyCoordinator(mtRegisterStrand, nullptr); setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable? } void Ipc::Strand::receive(const TypedMsgHdr &message) { - debugs(54, 6, HERE << message.type()); - switch (message.type()) { + switch (message.rawType()) { - case mtRegistration: - handleRegistrationResponse(HereIamMessage(message)); + case mtStrandRegistered: + handleRegistrationResponse(StrandMessage(message)); break; case mtSharedListenResponse: @@ -74,8 +70,8 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) break; #if HAVE_DISKIO_MODULE_IPCIO - case mtStrandSearchResponse: - IpcIoFile::HandleOpenResponse(StrandSearchResponse(message)); + case mtStrandReady: + IpcIoFile::HandleOpenResponse(StrandMessage(message)); break; case mtIpcIoNotification: @@ -114,12 +110,13 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) #endif default: - debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type()); + debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message with an unknown type: " << message.rawType()); break; } } -void Ipc::Strand::handleRegistrationResponse(const HereIamMessage &msg) +void +Ipc::Strand::handleRegistrationResponse(const StrandMessage &msg) { // handle registration response from the coordinator; it could be stale if (msg.strand.kidId == KidIdentifier && msg.strand.pid == getpid()) { diff --git a/src/ipc/Strand.h b/src/ipc/Strand.h index c6392494d1..9ac1273da0 100644 --- a/src/ipc/Strand.h +++ b/src/ipc/Strand.h @@ -39,7 +39,7 @@ protected: private: void registerSelf(); /// let Coordinator know this strand exists - void handleRegistrationResponse(const HereIamMessage &msg); + void handleRegistrationResponse(const StrandMessage &); void handleCacheMgrRequest(const Mgr::Request& request); void handleCacheMgrResponse(const Mgr::Response& response); #if SQUID_SNMP diff --git a/src/ipc/StrandCoord.cc b/src/ipc/StrandCoord.cc index c65581f0ee..fc35b0fb86 100644 --- a/src/ipc/StrandCoord.cc +++ b/src/ipc/StrandCoord.cc @@ -10,7 +10,8 @@ #include "squid.h" #include "Debug.h" -#include "ipc/Messages.h" +#include "globals.h" +#include "ipc/Port.h" #include "ipc/StrandCoord.h" #include "ipc/TypedMsgHdr.h" @@ -37,20 +38,32 @@ void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const hdrMsg.putString(tag); } -Ipc::HereIamMessage::HereIamMessage(const StrandCoord &aStrand): +Ipc::StrandMessage::StrandMessage(const StrandCoord &aStrand): strand(aStrand) { } -Ipc::HereIamMessage::HereIamMessage(const TypedMsgHdr &hdrMsg) +Ipc::StrandMessage::StrandMessage(const TypedMsgHdr &hdrMsg) { - hdrMsg.checkType(mtRegistration); strand.unpack(hdrMsg); } -void Ipc::HereIamMessage::pack(TypedMsgHdr &hdrMsg) const +void +Ipc::StrandMessage::pack(const MessageType messageType, TypedMsgHdr &hdrMsg) const { - hdrMsg.setType(mtRegistration); + hdrMsg.setType(messageType); strand.pack(hdrMsg); } +void +Ipc::StrandMessage::NotifyCoordinator(const MessageType msgType, const char *tag) +{ + static const auto pid = getpid(); + StrandMessage message(StrandCoord(KidIdentifier, pid)); + if (tag) + message.strand.tag = tag; + TypedMsgHdr hdr; + message.pack(msgType, hdr); + SendMessage(Port::CoordinatorAddr(), hdr); +} + diff --git a/src/ipc/StrandCoord.h b/src/ipc/StrandCoord.h index d58e96e3ef..2758d7777c 100644 --- a/src/ipc/StrandCoord.h +++ b/src/ipc/StrandCoord.h @@ -10,6 +10,7 @@ #define SQUID_IPC_STRAND_COORD_H #include "ipc/forward.h" +#include "ipc/Messages.h" #include "SquidString.h" namespace Ipc @@ -32,16 +33,19 @@ public: String tag; ///< optional unique well-known key (e.g., cache_dir path) }; -/// strand registration with Coordinator (also used as an ACK) -class HereIamMessage +/// an IPC message carrying StrandCoord +class StrandMessage { public: - explicit HereIamMessage(const StrandCoord &strand); ///< from registrant - explicit HereIamMessage(const TypedMsgHdr &hdrMsg); ///< from recvmsg() - void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + explicit StrandMessage(const StrandCoord &); + explicit StrandMessage(const TypedMsgHdr &); + void pack(MessageType, TypedMsgHdr &) const; + + /// creates and sends StrandMessage to Coordinator + static void NotifyCoordinator(MessageType, const char *tag); public: - StrandCoord strand; ///< registrant coordinates and related details + StrandCoord strand; ///< messageType-specific coordinates (e.g., sender) }; } // namespace Ipc; diff --git a/src/ipc/StrandSearch.cc b/src/ipc/StrandSearch.cc index 30d8b1b1e4..13f9406ebb 100644 --- a/src/ipc/StrandSearch.cc +++ b/src/ipc/StrandSearch.cc @@ -20,34 +20,15 @@ Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1) Ipc::StrandSearchRequest::StrandSearchRequest(const TypedMsgHdr &hdrMsg): requestorId(-1) { - hdrMsg.checkType(mtStrandSearchRequest); + hdrMsg.checkType(mtFindStrand); hdrMsg.getPod(requestorId); hdrMsg.getString(tag); } void Ipc::StrandSearchRequest::pack(TypedMsgHdr &hdrMsg) const { - hdrMsg.setType(mtStrandSearchRequest); + hdrMsg.setType(mtFindStrand); hdrMsg.putPod(requestorId); hdrMsg.putString(tag); } -/* StrandSearchResponse */ - -Ipc::StrandSearchResponse::StrandSearchResponse(const Ipc::StrandCoord &aStrand): - strand(aStrand) -{ -} - -Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg) -{ - hdrMsg.checkType(mtStrandSearchResponse); - strand.unpack(hdrMsg); -} - -void Ipc::StrandSearchResponse::pack(TypedMsgHdr &hdrMsg) const -{ - hdrMsg.setType(mtStrandSearchResponse); - strand.pack(hdrMsg); -} - diff --git a/src/ipc/StrandSearch.h b/src/ipc/StrandSearch.h index ef76b60870..3c3cd9d8cf 100644 --- a/src/ipc/StrandSearch.h +++ b/src/ipc/StrandSearch.h @@ -29,18 +29,6 @@ public: String tag; ///< set when looking for a matching StrandCoord::tag }; -/// asynchronous strand search response -class StrandSearchResponse -{ -public: - StrandSearchResponse(const StrandCoord &strand); - explicit StrandSearchResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg() - void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() - -public: - StrandCoord strand; ///< answer matching StrandSearchRequest criteria -}; - } // namespace Ipc; #endif /* SQUID_IPC_STRAND_SEARCH_H */ diff --git a/src/ipc/TypedMsgHdr.cc b/src/ipc/TypedMsgHdr.cc index fadaf75ba9..c115944f93 100644 --- a/src/ipc/TypedMsgHdr.cc +++ b/src/ipc/TypedMsgHdr.cc @@ -81,13 +81,6 @@ void Ipc::TypedMsgHdr::sync() offset = 0; } -int -Ipc::TypedMsgHdr::type() const -{ - Must(msg_iovlen == 1); - return data.type_; -} - void Ipc::TypedMsgHdr::address(const struct sockaddr_un& addr) { @@ -100,7 +93,7 @@ Ipc::TypedMsgHdr::address(const struct sockaddr_un& addr) void Ipc::TypedMsgHdr::checkType(int destType) const { - Must(type() == destType); + Must(rawType() == destType); } void diff --git a/src/ipc/TypedMsgHdr.h b/src/ipc/TypedMsgHdr.h index 37d471a698..5fc7ac443f 100644 --- a/src/ipc/TypedMsgHdr.h +++ b/src/ipc/TypedMsgHdr.h @@ -12,6 +12,7 @@ #define SQUID_IPC_TYPED_MSG_HDR_H #include "compat/cmsg.h" +#include "ipc/Messages.h" #if HAVE_SYS_SOCKET_H #include #endif @@ -43,7 +44,9 @@ public: /* message type manipulation; these must be called before put/get*() */ void setType(int aType); ///< sets message type; use MessageType enum void checkType(int aType) const; ///< throws if stored type is not aType - int type() const; ///< returns stored type or zero if none + /// received or set message kind; may not be a MessageType value + /// \returns 0 if no message kind has been received or set + int rawType() const { return msg_iov ? data.type_ : 0; } /* access for Plain Old Data (POD)-based message parts */ template diff --git a/src/ipc/forward.h b/src/ipc/forward.h index faeed7fe9b..8fff7804a1 100644 --- a/src/ipc/forward.h +++ b/src/ipc/forward.h @@ -16,8 +16,7 @@ namespace Ipc class TypedMsgHdr; class StrandCoord; -class HereIamMessage; -class StrandSearchResponse; +class StrandMessage; class Forwarder; class Inquirer; class Request;