No functionality changes other than minor debugging improvements.
* replaced identical (except for the message kind value) HereIamMessage
and StrandSearchResponse classes with StrandMessage
* reduced code duplication with a new StrandMessage::NotifyCoordinator()
* split TypedMsgHdr::type() into unchecked rawType() and checked type()
* renamed and documented several Ipc::MessageType enum values
The above code improvements will help with adding more IPC messages.
#include "ipc/Messages.h"
#include "ipc/Port.h"
#include "ipc/Queue.h"
+#include "ipc/StrandCoord.h"
#include "ipc/StrandSearch.h"
#include "ipc/UdsOp.h"
#include "sbuf/SBuf.h"
queue->localRateLimit().store(config.ioRate);
- Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, myPid));
- ann.strand.tag = dbName;
- Ipc::TypedMsgHdr message;
- ann.pack(message);
- SendMessage(Ipc::Port::CoordinatorAddr(), message);
+ Ipc::StrandMessage::NotifyCoordinator(Ipc::mtRegisterStrand, dbName.termedBuf());
ioRequestor->ioCompletedNotification();
return;
}
void
-IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
+IpcIoFile::openCompleted(const Ipc::StrandMessage *const response)
{
Must(diskId < 0); // we do not know our disker yet
/// called when coordinator responds to worker open request
void
-IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
+IpcIoFile::HandleOpenResponse(const Ipc::StrandMessage &response)
{
debugs(47, 7, HERE << "coordinator response to open request");
for (IpcIoFileList::iterator i = WaitingForOpen.begin();
virtual bool ioInProgress() const;
/// handle open response from coordinator
- static void HandleOpenResponse(const Ipc::StrandSearchResponse &response);
+ static void HandleOpenResponse(const Ipc::StrandMessage &);
/// handle queue push notifications from worker or disker
static void HandleNotification(const Ipc::TypedMsgHdr &msg);
protected:
friend class IpcIoPendingRequest;
- void openCompleted(const Ipc::StrandSearchResponse *const response);
+ void openCompleted(const Ipc::StrandMessage *);
void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response);
void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
bool canWait() const;
void Ipc::Coordinator::receive(const TypedMsgHdr& message)
{
- switch (message.type()) {
- case mtRegistration:
+ switch (message.rawType()) {
+ case mtRegisterStrand:
debugs(54, 6, HERE << "Registration request");
- handleRegistrationRequest(HereIamMessage(message));
+ handleRegistrationRequest(StrandMessage(message));
break;
- case mtStrandSearchRequest: {
+ case mtFindStrand: {
const StrandSearchRequest sr(message);
debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
" tag: " << sr.tag);
#endif
default:
- debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
+ debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message with an unknown type: " << message.rawType());
break;
}
}
-void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
+void Ipc::Coordinator::handleRegistrationRequest(const StrandMessage& msg)
{
registerStrand(msg.strand);
// send back an acknowledgement; TODO: remove as not needed?
TypedMsgHdr message;
- msg.pack(message);
+ msg.pack(mtStrandRegistered, message);
SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
}
{
debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
request.tag << " is kid" << strand.kidId);
- const StrandSearchResponse response(strand);
+ const StrandMessage response(strand);
TypedMsgHdr message;
- response.pack(message);
+ response.pack(mtStrandReady, message);
SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
}
StrandCoord* findStrand(int kidId); ///< registered strand or NULL
void registerStrand(const StrandCoord &); ///< adds or updates existing
- void handleRegistrationRequest(const HereIamMessage &); ///< register,ACK
+ void handleRegistrationRequest(const StrandMessage &); ///< register,ACK
/// answer the waiting search request
void notifySearcher(const StrandSearchRequest &request, const StrandCoord&);
{
/// message class identifier
-typedef enum { mtNone = 0, mtRegistration,
- mtStrandSearchRequest, mtStrandSearchResponse,
- mtSharedListenRequest, mtSharedListenResponse,
+typedef enum { mtNone = 0, ///< unspecified or unknown message kind; unused on the wire
+
+ mtRegisterStrand, ///< notifies about our strand existence
+ mtStrandRegistered, ///< acknowledges mtRegisterStrand acceptance
+
+ mtFindStrand, ///< a worker requests a strand from Coordinator
+ mtStrandReady, ///< an mtFindStrand answer: the strand exists and should be usable
+
+ mtSharedListenRequest,
+ mtSharedListenResponse,
+
mtIpcIoNotification,
+
mtCollapsedForwardingNotification,
- mtCacheMgrRequest, mtCacheMgrResponse
+
+ mtCacheMgrRequest,
+ mtCacheMgrResponse,
+
#if SQUID_SNMP
- ,
- mtSnmpRequest, mtSnmpResponse
+ mtSnmpRequest,
+ mtSnmpResponse,
#endif
+
+ mtEnd ///< for message kind range checks; unused on the wire
} MessageType;
} // namespace Ipc;
" [" << this << ']');
if (params.flag == Comm::OK) {
assert(params.buf == buf.raw());
+ debugs(54, 6, "message type: " << buf.rawType());
receive(buf);
}
// TODO: if there was a fatal error on our socket, close the socket before
debugs(54, 6, HERE);
Must(!isRegistered);
- HereIamMessage ann(StrandCoord(KidIdentifier, getpid()));
- TypedMsgHdr message;
- ann.pack(message);
- SendMessage(Port::CoordinatorAddr(), message);
+ StrandMessage::NotifyCoordinator(mtRegisterStrand, nullptr);
setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
}
void Ipc::Strand::receive(const TypedMsgHdr &message)
{
- debugs(54, 6, HERE << message.type());
- switch (message.type()) {
+ switch (message.rawType()) {
- case mtRegistration:
- handleRegistrationResponse(HereIamMessage(message));
+ case mtStrandRegistered:
+ handleRegistrationResponse(StrandMessage(message));
break;
case mtSharedListenResponse:
break;
#if HAVE_DISKIO_MODULE_IPCIO
- case mtStrandSearchResponse:
- IpcIoFile::HandleOpenResponse(StrandSearchResponse(message));
+ case mtStrandReady:
+ IpcIoFile::HandleOpenResponse(StrandMessage(message));
break;
case mtIpcIoNotification:
#endif
default:
- debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
+ debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message with an unknown type: " << message.rawType());
break;
}
}
-void Ipc::Strand::handleRegistrationResponse(const HereIamMessage &msg)
+void
+Ipc::Strand::handleRegistrationResponse(const StrandMessage &msg)
{
// handle registration response from the coordinator; it could be stale
if (msg.strand.kidId == KidIdentifier && msg.strand.pid == getpid()) {
private:
void registerSelf(); /// let Coordinator know this strand exists
- void handleRegistrationResponse(const HereIamMessage &msg);
+ void handleRegistrationResponse(const StrandMessage &);
void handleCacheMgrRequest(const Mgr::Request& request);
void handleCacheMgrResponse(const Mgr::Response& response);
#if SQUID_SNMP
#include "squid.h"
#include "Debug.h"
-#include "ipc/Messages.h"
+#include "globals.h"
+#include "ipc/Port.h"
#include "ipc/StrandCoord.h"
#include "ipc/TypedMsgHdr.h"
hdrMsg.putString(tag);
}
-Ipc::HereIamMessage::HereIamMessage(const StrandCoord &aStrand):
+Ipc::StrandMessage::StrandMessage(const StrandCoord &aStrand):
strand(aStrand)
{
}
-Ipc::HereIamMessage::HereIamMessage(const TypedMsgHdr &hdrMsg)
+Ipc::StrandMessage::StrandMessage(const TypedMsgHdr &hdrMsg)
{
- hdrMsg.checkType(mtRegistration);
strand.unpack(hdrMsg);
}
-void Ipc::HereIamMessage::pack(TypedMsgHdr &hdrMsg) const
+void
+Ipc::StrandMessage::pack(const MessageType messageType, TypedMsgHdr &hdrMsg) const
{
- hdrMsg.setType(mtRegistration);
+ hdrMsg.setType(messageType);
strand.pack(hdrMsg);
}
+void
+Ipc::StrandMessage::NotifyCoordinator(const MessageType msgType, const char *tag)
+{
+ static const auto pid = getpid();
+ StrandMessage message(StrandCoord(KidIdentifier, pid));
+ if (tag)
+ message.strand.tag = tag;
+ TypedMsgHdr hdr;
+ message.pack(msgType, hdr);
+ SendMessage(Port::CoordinatorAddr(), hdr);
+}
+
#define SQUID_IPC_STRAND_COORD_H
#include "ipc/forward.h"
+#include "ipc/Messages.h"
#include "SquidString.h"
namespace Ipc
String tag; ///< optional unique well-known key (e.g., cache_dir path)
};
-/// strand registration with Coordinator (also used as an ACK)
-class HereIamMessage
+/// an IPC message carrying StrandCoord
+class StrandMessage
{
public:
- explicit HereIamMessage(const StrandCoord &strand); ///< from registrant
- explicit HereIamMessage(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
- void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+ explicit StrandMessage(const StrandCoord &);
+ explicit StrandMessage(const TypedMsgHdr &);
+ void pack(MessageType, TypedMsgHdr &) const;
+
+ /// creates and sends StrandMessage to Coordinator
+ static void NotifyCoordinator(MessageType, const char *tag);
public:
- StrandCoord strand; ///< registrant coordinates and related details
+ StrandCoord strand; ///< messageType-specific coordinates (e.g., sender)
};
} // namespace Ipc;
Ipc::StrandSearchRequest::StrandSearchRequest(const TypedMsgHdr &hdrMsg):
requestorId(-1)
{
- hdrMsg.checkType(mtStrandSearchRequest);
+ hdrMsg.checkType(mtFindStrand);
hdrMsg.getPod(requestorId);
hdrMsg.getString(tag);
}
void Ipc::StrandSearchRequest::pack(TypedMsgHdr &hdrMsg) const
{
- hdrMsg.setType(mtStrandSearchRequest);
+ hdrMsg.setType(mtFindStrand);
hdrMsg.putPod(requestorId);
hdrMsg.putString(tag);
}
-/* StrandSearchResponse */
-
-Ipc::StrandSearchResponse::StrandSearchResponse(const Ipc::StrandCoord &aStrand):
- strand(aStrand)
-{
-}
-
-Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg)
-{
- hdrMsg.checkType(mtStrandSearchResponse);
- strand.unpack(hdrMsg);
-}
-
-void Ipc::StrandSearchResponse::pack(TypedMsgHdr &hdrMsg) const
-{
- hdrMsg.setType(mtStrandSearchResponse);
- strand.pack(hdrMsg);
-}
-
String tag; ///< set when looking for a matching StrandCoord::tag
};
-/// asynchronous strand search response
-class StrandSearchResponse
-{
-public:
- StrandSearchResponse(const StrandCoord &strand);
- explicit StrandSearchResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
- void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
-
-public:
- StrandCoord strand; ///< answer matching StrandSearchRequest criteria
-};
-
} // namespace Ipc;
#endif /* SQUID_IPC_STRAND_SEARCH_H */
offset = 0;
}
-int
-Ipc::TypedMsgHdr::type() const
-{
- Must(msg_iovlen == 1);
- return data.type_;
-}
-
void
Ipc::TypedMsgHdr::address(const struct sockaddr_un& addr)
{
void
Ipc::TypedMsgHdr::checkType(int destType) const
{
- Must(type() == destType);
+ Must(rawType() == destType);
}
void
#define SQUID_IPC_TYPED_MSG_HDR_H
#include "compat/cmsg.h"
+#include "ipc/Messages.h"
#if HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
/* message type manipulation; these must be called before put/get*() */
void setType(int aType); ///< sets message type; use MessageType enum
void checkType(int aType) const; ///< throws if stored type is not aType
- int type() const; ///< returns stored type or zero if none
+ /// received or set message kind; may not be a MessageType value
+ /// \returns 0 if no message kind has been received or set
+ int rawType() const { return msg_iov ? data.type_ : 0; }
/* access for Plain Old Data (POD)-based message parts */
template <class Pod>
class TypedMsgHdr;
class StrandCoord;
-class HereIamMessage;
-class StrandSearchResponse;
+class StrandMessage;
class Forwarder;
class Inquirer;
class Request;