return;
}
- Ipc::StrandSearchRequest request;
- request.requestorId = KidIdentifier;
- request.tag = dbName;
-
+ const Ipc::StrandSearchRequest request(dbName);
Ipc::TypedMsgHdr msg;
request.pack(msg);
Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
Subscription.h \
TextException.cc \
TextException.h \
+ TypeTraits.h \
YesNoNone.h \
forward.h
--- /dev/null
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_SRC_BASE_TYPETRAITS_H
+#define SQUID_SRC_BASE_TYPETRAITS_H
+
+namespace TypeTraits_ { // a hack to prevent "unintended ADL"
+
+// TODO: Extract reusable paradigms into other mixins (e.g., NonCopyable).
+/// convenience base for any class with pure virtual method(s)
+class Interface
+{
+public:
+ // ensures proper destruction via pointers to base interface classes
+ virtual ~Interface() = default;
+
+ // prohibits copy/move assignment to prevent accidental object slicing
+ Interface &operator=(const Interface &) = delete;
+ Interface &operator=(Interface &&) = delete;
+
+protected: // prevents accidental creation of Interface instances
+
+ // allows default-construction in kids
+ constexpr Interface() = default;
+
+ // allows copy/move construction for kids convenience
+ Interface(const Interface &) = default;
+ Interface(Interface &&) = default;
+};
+
+} // namespace TypeTraits_
+
+using Interface = TypeTraits_::Interface;
+
+#endif /* SQUID_SRC_BASE_TYPETRAITS_H */
+
{
public:
- /** @name Constructors and Destructor */
+ /** @name Constructors */
/*@{*/
Address() { setEmpty(); }
Address(const struct in_addr &);
Address(const struct hostent &);
Address(const struct addrinfo &);
Address(const char*);
- ~Address() {}
/*@}*/
/** @name Assignment Operators */
case mtCacheMgrResponse: {
debugs(54, 6, HERE << "Cache manager response");
const Mgr::Response resp(message);
- handleCacheMgrResponse(resp);
+ handleCacheMgrResponse(Mine(resp));
}
break;
case mtSnmpResponse: {
debugs(54, 6, HERE << "SNMP response");
const Snmp::Response resp(message);
- handleSnmpResponse(resp);
+ handleSnmpResponse(Mine(resp));
}
break;
#endif
default:
- debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message with an unknown type: " << message.rawType());
+ Port::receive(message);
break;
}
}
{
debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
request.tag << " is kid" << strand.kidId);
- const StrandMessage response(strand);
+ const StrandMessage response(strand, request.qid);
TypedMsgHdr message;
response.pack(mtStrandReady, message);
SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
#include "HttpRequest.h"
#include "ipc/Forwarder.h"
#include "ipc/Port.h"
+#include "ipc/RequestId.h"
#include "ipc/TypedMsgHdr.h"
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Forwarder);
Ipc::Forwarder::RequestsMap Ipc::Forwarder::TheRequestsMap;
-unsigned int Ipc::Forwarder::LastRequestId = 0;
+Ipc::RequestId::Index Ipc::Forwarder::LastRequestId = 0;
Ipc::Forwarder::Forwarder(Request::Pointer aRequest, double aTimeout):
AsyncJob("Ipc::Forwarder"),
/// returns and forgets the right Forwarder callback for the request
AsyncCall::Pointer
-Ipc::Forwarder::DequeueRequest(unsigned int requestId)
+Ipc::Forwarder::DequeueRequest(const RequestId::Index requestId)
{
debugs(54, 3, HERE);
Must(requestId != 0);
}
void
-Ipc::Forwarder::HandleRemoteAck(unsigned int requestId)
+Ipc::Forwarder::HandleRemoteAck(const RequestId requestId)
{
debugs(54, 3, HERE);
Must(requestId != 0);
virtual ~Forwarder();
/// finds and calls the right Forwarder upon Coordinator's response
- static void HandleRemoteAck(unsigned int requestId);
+ static void HandleRemoteAck(RequestId);
/* has-to-be-public AsyncJob API */
virtual void callException(const std::exception& e);
void handleRemoteAck();
- static AsyncCall::Pointer DequeueRequest(unsigned int requestId);
+ static AsyncCall::Pointer DequeueRequest(RequestId::Index);
protected:
Request::Pointer request;
const double timeout; ///< response wait timeout in seconds
/// maps request->id to Forwarder::handleRemoteAck callback
- typedef std::map<unsigned int, AsyncCall::Pointer> RequestsMap;
+ typedef std::map<RequestId::Index, AsyncCall::Pointer> RequestsMap;
static RequestsMap TheRequestsMap; ///< pending Coordinator requests
- static unsigned int LastRequestId; ///< last requestId used
+ static RequestId::Index LastRequestId; ///< last requestId used
};
} // namespace Ipc
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap;
-unsigned int Ipc::Inquirer::LastRequestId = 0;
+Ipc::RequestId::Index Ipc::Inquirer::LastRequestId = 0;
/// compare Ipc::StrandCoord using kidId, for std::sort() below
static bool
/// returns and forgets the right Inquirer callback for strand request
AsyncCall::Pointer
-Ipc::Inquirer::DequeueRequest(unsigned int requestId)
+Ipc::Inquirer::DequeueRequest(const RequestId::Index requestId)
{
debugs(54, 3, HERE << " requestId " << requestId);
Must(requestId != 0);
{
static MemBuf buf;
buf.reset();
- buf.appendf(" [request->requestId %u]", request->requestId);
+ buf.appendf(" [requestId %u]", request->requestId.index());
buf.terminate();
return buf.content();
}
void handleRemoteAck(Response::Pointer response);
- static AsyncCall::Pointer DequeueRequest(unsigned int requestId);
+ static AsyncCall::Pointer DequeueRequest(RequestId::Index);
static void RequestTimedOut(void* param);
void requestTimedOut();
const double timeout; ///< number of seconds to wait for strand response
/// maps request->id to Inquirer::handleRemoteAck callback
- typedef std::map<unsigned int, AsyncCall::Pointer> RequestsMap;
+ typedef std::map<RequestId::Index, AsyncCall::Pointer> RequestsMap;
static RequestsMap TheRequestsMap; ///< pending strand requests
- static unsigned int LastRequestId; ///< last requestId used
+ static RequestId::Index LastRequestId; ///< last requestId used
};
} // namespace Ipc
Messages.h \
Port.cc \
Port.h \
+ QuestionerId.cc \
+ QuestionerId.h \
Queue.cc \
Queue.h \
ReadWriteLock.cc \
ReadWriteLock.h \
Request.h \
+ RequestId.cc \
+ RequestId.h \
Response.h \
SharedListen.cc \
SharedListen.h \
#include "comm/Read.h"
#include "CommCalls.h"
#include "ipc/Port.h"
+#include "sbuf/Stream.h"
#include "tools.h"
#include "util.h"
return coordinatorAddr;
}
+void
+Ipc::Port::receive(const TypedMsgHdr &message)
+{
+ throw TextException(ToSBuf("bad IPC message type: ", message.rawType()), Here());
+}
+
+/// receive() but ignore any errors
+void
+Ipc::Port::receiveOrIgnore(const TypedMsgHdr &message)
+{
+ try {
+ receive(message);
+ } catch (...) {
+ debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message" <<
+ Debug::Extra << "message type: " << message.rawType() <<
+ Debug::Extra << "problem: " << CurrentException);
+ }
+}
+
void Ipc::Port::noteRead(const CommIoCbParams& params)
{
debugs(54, 6, HERE << params.conn << " flag " << params.flag <<
if (params.flag == Comm::OK) {
assert(params.buf == buf.raw());
debugs(54, 6, "message type: " << buf.rawType());
- receive(buf);
+ receiveOrIgnore(buf);
}
// TODO: if there was a fatal error on our socket, close the socket before
// trying to listen again and print a level-1 error message.
void doListen();
/// handle IPC message just read
- virtual void receive(const TypedMsgHdr& message) = 0;
+ /// kids must call parent method when they do not recognize the message type
+ virtual void receive(const TypedMsgHdr &) = 0;
private:
void noteRead(const CommIoCbParams ¶ms); // Comm callback API
+ void receiveOrIgnore(const TypedMsgHdr& );
private:
TypedMsgHdr buf; ///< msghdr struct filled by Comm
--- /dev/null
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/TextException.h"
+#include "ipc/QuestionerId.h"
+#include "ipc/TypedMsgHdr.h"
+#include "sbuf/Stream.h"
+
+#include <iostream>
+
+Ipc::QuestionerId
+Ipc::MyQuestionerId()
+{
+ static const QuestionerId qid(getpid());
+ return qid;
+}
+
+void
+Ipc::QuestionerId::pack(TypedMsgHdr &hdrMsg) const
+{
+ hdrMsg.putPod(pid);
+}
+
+void
+Ipc::QuestionerId::unpack(const TypedMsgHdr &hdrMsg)
+{
+ hdrMsg.getPod(pid);
+}
+
+void
+Ipc::QuestionerId::rejectAnswerIfStale() const
+{
+ const auto myPid = MyQuestionerId().pid;
+ if (myPid != pid) {
+ throw TextException(ToSBuf("received answer to an IPC question asked by process ", pid,
+ Debug::Extra, "my process PID: ", myPid), Here());
+ }
+}
+
+void
+Ipc::QuestionerId::print(std::ostream &os) const
+{
+ os << pid;
+}
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_SRC_IPC_QUESTIONERID_H
+#define SQUID_SRC_IPC_QUESTIONERID_H
+
+#include "ipc/forward.h"
+
+#include <iosfwd>
+
+namespace Ipc
+{
+
+/// Identifies a kid process sending IPC messages that require an answer.
+/// Must be unique across all kids with pending questions.
+class QuestionerId
+{
+public:
+ /// to-be-determined ID
+ QuestionerId() = default;
+
+ /// for sending the ID of the asking process
+ void pack(TypedMsgHdr &) const;
+
+ /// for receiving the ID of the asking process
+ void unpack(const TypedMsgHdr &);
+
+ /// does nothing but throws if the questioner was not the current process
+ void rejectAnswerIfStale() const;
+
+ /// reports the stored opaque ID value (for debugging)
+ void print(std::ostream &) const;
+
+private:
+ /// for MyQuestionerId() convenience
+ explicit QuestionerId(const pid_t aPid): pid(aPid) {}
+ friend QuestionerId MyQuestionerId();
+
+ /// OS process ID of the asking kid. If the kid restarts, it is assumed
+ /// not to wrap back to the old value until the answer is received.
+ pid_t pid = -1;
+};
+
+/// the questioner ID of the current/calling process
+QuestionerId MyQuestionerId();
+
+/// Convenience wrapper for rejecting (freshly parsed) stale answers.
+/// All answers are assumed to have a "QuestionerId intendedRecepient()" member.
+template <class Answer>
+const Answer &
+Mine(const Answer &answer)
+{
+ answer.intendedRecepient().rejectAnswerIfStale();
+ return answer;
+}
+
+inline std::ostream &
+operator <<(std::ostream &os, const QuestionerId &qid)
+{
+ qid.print(os);
+ return os;
+}
+
+} // namespace Ipc;
+
+#endif /* SQUID_SRC_IPC_QUESTIONERID_H */
+
#define SQUID_IPC_REQUEST_H
#include "base/RefCount.h"
-#include "ipc/forward.h"
+#include "base/TypeTraits.h"
+#include "ipc/RequestId.h"
namespace Ipc
{
+// TODO: Request and Response ought to have their own un/pack() methods instead
+// of duplicating their functionality in derived classes. To avoid dependency
+// loops between libipc and libmgr/libsnmp, fixing that requires extracting
+// src/ipc/Coordinator and its friends into a new src/coordinator/ library.
+
/// IPC request
-class Request: public RefCountable
+class Request: public RefCountable, public Interface
{
public:
typedef RefCount<Request> Pointer;
public:
- Request(int aRequestorId, unsigned int aRequestId):
- requestorId(aRequestorId), requestId(aRequestId) {}
-
virtual void pack(TypedMsgHdr& msg) const = 0; ///< prepare for sendmsg()
virtual Pointer clone() const = 0; ///< returns a copy of this
-private:
- Request(const Request&); // not implemented
- Request& operator= (const Request&); // not implemented
-
public:
- int requestorId; ///< kidId of the requestor; used for response destination
- unsigned int requestId; ///< unique for sender; matches request w/ response
+ int requestorId = 0; ///< kidId of the requestor; used for response destination
+ RequestId requestId; ///< matches the request[or] with the response
+
+protected:
+ /// sender's constructor
+ Request(const int aRequestorId, const RequestId aRequestId):
+ requestorId(aRequestorId),
+ requestId(aRequestId)
+ {
+ }
+
+ /// recipient's constructor
+ Request() = default;
};
} // namespace Ipc
--- /dev/null
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/TextException.h"
+#include "Debug.h"
+#include "ipc/RequestId.h"
+
+#include <iostream>
+
+Ipc::RequestId::RequestId(const Index anIndex):
+ qid_(anIndex ? MyQuestionerId() : QuestionerId()),
+ index_(anIndex)
+{
+}
+
+std::ostream &
+Ipc::operator <<(std::ostream &os, const RequestId &requestId)
+{
+ os << requestId.index() << '@' << requestId.questioner();
+ return os;
+}
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_IPC_REQUESTID_H
+#define SQUID_IPC_REQUESTID_H
+
+#include "ipc/forward.h"
+#include "ipc/QuestionerId.h"
+
+#include <iosfwd>
+
+namespace Ipc
+{
+
+/// uniquely identifies an IPC request among same-type concurrent IPC requests
+/// submitted by a single Squid instance
+class RequestId
+{
+public:
+ /// A simple ID for correlating IPC responses with pending requests.
+ /// Value 0 has a special meaning of "unset/unknown", but otherwise opaque.
+ typedef unsigned int Index;
+
+ /// Request sender's constructor.
+ /// For performance and clarity sake, default constructor is preferred to 0 index.
+ explicit RequestId(Index);
+
+ /// request recipient's constructor
+ RequestId() = default;
+
+ /// Make the ID unset/unknown.
+ /// Optimization: leaves the questioner field alone.
+ void reset() { index_ = 0; }
+
+ /// Make the ID set/known with the given (by the questioner) index.
+ /// For performance and clarity sake, reset(void) is preferred to reset(0).
+ void reset(const Index anIndex) { *this = RequestId(anIndex); }
+
+ QuestionerId questioner() const { return qid_; }
+ Index index() const { return index_; }
+
+ // these conversion operators allow our users to treat us as an Index
+ operator Index() const { return index_; }
+ RequestId &operator =(const Index anIndex) { anIndex ? reset(anIndex) : reset(); return *this; }
+
+private:
+ /// the sender of the request
+ QuestionerId qid_;
+
+ /// request ID; unique within pending same-qid_ questions of the same kind
+ Index index_ = 0;
+};
+
+std::ostream &operator <<(std::ostream &, const RequestId &);
+
+} // namespace Ipc;
+
+#endif /* SQUID_IPC_REQUESTID_H */
+
#define SQUID_IPC_RESPONSE_H
#include "base/RefCount.h"
+#include "base/TypeTraits.h"
#include "ipc/forward.h"
+#include "ipc/QuestionerId.h"
namespace Ipc
{
/// A response to Ipc::Request.
-class Response: public RefCountable
+class Response: public RefCountable, public Interface
{
public:
typedef RefCount<Response> Pointer;
public:
- explicit Response(unsigned int aRequestId):
- requestId(aRequestId) {}
-
virtual void pack(TypedMsgHdr& msg) const = 0; ///< prepare for sendmsg()
virtual Pointer clone() const = 0; ///< returns a copy of this
-private:
- Response(const Response&); // not implemented
- Response& operator= (const Response&); // not implemented
+ /// for Mine() tests
+ QuestionerId intendedRecepient() const { return requestId.questioner(); }
public:
- unsigned int requestId; ///< ID of request we are responding to
-};
+ RequestId requestId; ///< the ID of the request we are responding to
-inline
-std::ostream& operator << (std::ostream &os, const Response& response)
-{
- os << "[response.requestId %u]" << response.requestId << '}';
- return os;
-}
+protected:
+ /// sender's constructor
+ explicit Response(const RequestId aRequestId): requestId(aRequestId) {}
+
+ /// recipient's constructor
+ Response() = default;
+};
} // namespace Ipc
};
/// maps ID assigned at request time to the response callback
-typedef std::map<int, PendingOpenRequest> SharedListenRequestMap;
+typedef std::map<Ipc::RequestId::Index, PendingOpenRequest> SharedListenRequestMap;
static SharedListenRequestMap TheSharedListenRequestMap;
/// accumulates delayed requests until they are ready to be sent, in FIFO order
typedef std::list<PendingOpenRequest> DelayedSharedListenRequests;
static DelayedSharedListenRequests TheDelayedRequests;
-static int
+// TODO: Encapsulate "Pending Request Map" logic shared by all RequestId users.
+/// registers the given request in the collection of pending requests
+/// \returns the registration key
+static Ipc::RequestId::Index
AddToMap(const PendingOpenRequest &por)
{
- // find unused ID using linear search; there should not be many entries
- for (int id = 0; true; ++id) {
- if (TheSharedListenRequestMap.find(id) == TheSharedListenRequestMap.end()) {
- TheSharedListenRequestMap[id] = por;
- return id;
- }
- }
- assert(false); // not reached
- return -1;
+ static Ipc::RequestId::Index LastIndex = 0;
+ // TODO: Switch Ipc::RequestId::Index to uint64_t and drop these 0 checks.
+ if (++LastIndex == 0) // don't use zero value as an ID
+ ++LastIndex;
+ assert(TheSharedListenRequestMap.find(LastIndex) == TheSharedListenRequestMap.end());
+ TheSharedListenRequestMap[LastIndex] = por;
+ return LastIndex;
}
bool
return addr.compareWhole(p.addr) < 0;
}
-Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
+Ipc::SharedListenRequest::SharedListenRequest(const OpenListenerParams &aParams, const RequestId aMapId):
+ requestorId(KidIdentifier),
+ params(aParams),
+ mapId(aMapId)
{
// caller will then set public data members
}
Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr &hdrMsg)
{
hdrMsg.checkType(mtSharedListenRequest);
+ // XXX: our handlerSubscription is not a POD!
hdrMsg.getPod(*this);
}
void Ipc::SharedListenRequest::pack(TypedMsgHdr &hdrMsg) const
{
hdrMsg.setType(mtSharedListenRequest);
+ // XXX: our handlerSubscription is not a POD!
hdrMsg.putPod(*this);
}
-Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId):
+Ipc::SharedListenResponse::SharedListenResponse(const int aFd, const int anErrNo, const RequestId aMapId):
fd(aFd), errNo(anErrNo), mapId(aMapId)
{
}
Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
- fd(-1), errNo(0), mapId(-1)
+ fd(-1),
+ errNo(0)
{
hdrMsg.checkType(mtSharedListenResponse);
hdrMsg.getPod(*this);
{
hdrMsg.setType(mtSharedListenResponse);
hdrMsg.putPod(*this);
+ // XXX: When we respond with an error, putFd() throws due to the negative fd
hdrMsg.putFd(fd);
}
static void
SendSharedListenRequest(const PendingOpenRequest &por)
{
- Ipc::SharedListenRequest request;
- request.requestorId = KidIdentifier;
- request.params = por.params;
- request.mapId = AddToMap(por);
+ const Ipc::SharedListenRequest request(por.params, Ipc::RequestId(AddToMap(por)));
debugs(54, 3, "getting listening FD for " << request.params.addr <<
" mapId=" << request.mapId);
TheSharedListenRequestMap.size() << " active + " <<
TheDelayedRequests.size() << " delayed requests");
- Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
- PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
- Must(por.callback != NULL);
- TheSharedListenRequestMap.erase(response.mapId);
+ Must(response.mapId);
+ const auto pori = TheSharedListenRequestMap.find(response.mapId.index());
+ Must(pori != TheSharedListenRequestMap.end());
+ auto por = pori->second;
+ Must(por.callback);
+ TheSharedListenRequestMap.erase(pori);
StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(por.callback->getDialer());
assert(cbd && cbd->conn != NULL);
#include "base/AsyncCall.h"
#include "base/Subscription.h"
+#include "ipc/QuestionerId.h"
+#include "ipc/RequestId.h"
namespace Ipc
{
class SharedListenRequest
{
public:
- SharedListenRequest(); ///< from OpenSharedListen() which then sets public data
+ SharedListenRequest(const OpenListenerParams &, RequestId aMapId); ///< sender's constructor
explicit SharedListenRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
- int mapId; ///< to map future response to the requestor's callback
+ RequestId mapId; ///< to map future response to the requestor's callback
};
/// a response to SharedListenRequest
class SharedListenResponse
{
public:
- SharedListenResponse(int fd, int errNo, int mapId);
+ SharedListenResponse(int fd, int errNo, RequestId aMapId); ///< sender's constructor
explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+ /// for Mine() tests
+ QuestionerId intendedRecepient() const { return mapId.questioner(); }
+
public:
int fd; ///< opened listening socket or -1
int errNo; ///< errno value from comm_open_sharedListen() call
- int mapId; ///< to map future response to the requestor's callback
+ RequestId mapId; ///< to map future response to the requestor's callback
};
/// prepare and send SharedListenRequest to Coordinator
#include "globals.h"
#include "ipc/Kids.h"
#include "ipc/Messages.h"
+#include "ipc/QuestionerId.h"
#include "ipc/SharedListen.h"
#include "ipc/Strand.h"
#include "ipc/StrandCoord.h"
switch (message.rawType()) {
case mtStrandRegistered:
- handleRegistrationResponse(StrandMessage(message));
+ handleRegistrationResponse(Mine(StrandMessage(message)));
break;
case mtSharedListenResponse:
- SharedListenJoined(SharedListenResponse(message));
+ SharedListenJoined(Mine(SharedListenResponse(message)));
break;
#if HAVE_DISKIO_MODULE_IPCIO
case mtStrandReady:
- IpcIoFile::HandleOpenResponse(StrandMessage(message));
+ IpcIoFile::HandleOpenResponse(Mine(StrandMessage(message)));
break;
case mtIpcIoNotification:
case mtCacheMgrResponse: {
const Mgr::Response resp(message);
- handleCacheMgrResponse(resp);
+ handleCacheMgrResponse(Mine(resp));
}
break;
case mtSnmpResponse: {
const Snmp::Response resp(message);
- handleSnmpResponse(resp);
+ handleSnmpResponse(Mine(resp));
}
break;
#endif
default:
- debugs(54, DBG_IMPORTANT, "WARNING: Ignoring IPC message with an unknown type: " << message.rawType());
+ Port::receive(message);
break;
}
}
hdrMsg.putString(tag);
}
-Ipc::StrandMessage::StrandMessage(const StrandCoord &aStrand):
- strand(aStrand)
+Ipc::StrandMessage::StrandMessage(const StrandCoord &aStrand, const QuestionerId aQid):
+ strand(aStrand),
+ qid(aQid)
{
}
Ipc::StrandMessage::StrandMessage(const TypedMsgHdr &hdrMsg)
{
strand.unpack(hdrMsg);
+ qid.unpack(hdrMsg);
}
void
{
hdrMsg.setType(messageType);
strand.pack(hdrMsg);
+ qid.pack(hdrMsg);
}
void
Ipc::StrandMessage::NotifyCoordinator(const MessageType msgType, const char *tag)
{
static const auto pid = getpid();
- StrandMessage message(StrandCoord(KidIdentifier, pid));
+ StrandMessage message(StrandCoord(KidIdentifier, pid), MyQuestionerId());
if (tag)
message.strand.tag = tag;
TypedMsgHdr hdr;
#include "ipc/forward.h"
#include "ipc/Messages.h"
+#include "ipc/QuestionerId.h"
#include "SquidString.h"
namespace Ipc
class StrandMessage
{
public:
- explicit StrandMessage(const StrandCoord &);
+ explicit StrandMessage(const StrandCoord &, QuestionerId);
explicit StrandMessage(const TypedMsgHdr &);
void pack(MessageType, TypedMsgHdr &) const;
/// creates and sends StrandMessage to Coordinator
static void NotifyCoordinator(MessageType, const char *tag);
+ /// for Mine() tests
+ QuestionerId intendedRecepient() const { return qid; }
+
public:
StrandCoord strand; ///< messageType-specific coordinates (e.g., sender)
+
+ /// For IPC requests/questions: The sender of this request.
+ /// For IPC responses/answers: The sender of the corresponding request.
+ QuestionerId qid;
};
} // namespace Ipc;
/* DEBUG: section 54 Interprocess Communication */
#include "squid.h"
+#include "globals.h"
#include "ipc/Messages.h"
#include "ipc/StrandSearch.h"
#include "ipc/TypedMsgHdr.h"
-Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1)
+Ipc::StrandSearchRequest::StrandSearchRequest(const String &aTag):
+ requestorId(KidIdentifier),
+ tag(aTag),
+ qid(MyQuestionerId())
{
}
hdrMsg.checkType(mtFindStrand);
hdrMsg.getPod(requestorId);
hdrMsg.getString(tag);
+ qid.unpack(hdrMsg);
}
void Ipc::StrandSearchRequest::pack(TypedMsgHdr &hdrMsg) const
hdrMsg.setType(mtFindStrand);
hdrMsg.putPod(requestorId);
hdrMsg.putString(tag);
+ qid.pack(hdrMsg);
}
#define SQUID_IPC_STRAND_SEARCH_H
#include "ipc/forward.h"
+#include "ipc/QuestionerId.h"
#include "ipc/StrandCoord.h"
#include "SquidString.h"
class StrandSearchRequest
{
public:
- StrandSearchRequest();
+ explicit StrandSearchRequest(const String &aTag); ///< sender's constructor
explicit StrandSearchRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
public:
int requestorId; ///< sender-provided return address
String tag; ///< set when looking for a matching StrandCoord::tag
+ QuestionerId qid; ///< the sender of the request
};
} // namespace Ipc;
/// \returns 0 if no message kind has been received or set
int rawType() const { return msg_iov ? data.type_ : 0; }
- /* access for Plain Old Data (POD)-based message parts */
- template <class Pod>
- void getPod(Pod &pod) const { getFixed(&pod, sizeof(pod)); } ///< load POD
- template <class Pod>
- void putPod(const Pod &pod) { putFixed(&pod, sizeof(pod)); } ///< store POD
+ /* access for TriviallyCopyable (a.k.a. Plain Old Data or POD) message parts */
+ template <class Pod> void getPod(Pod &pod) const; ///< load POD
+ template <class Pod> void putPod(const Pod &pod); ///< store POD
/* access to message parts for selected commonly-used part types */
void getString(String &s) const; ///< load variable-length string
} // namespace Ipc
+template <class Pod>
+void
+Ipc::TypedMsgHdr::getPod(Pod &pod) const
+{
+ // TODO: Enable after fixing Ipc::SharedListenRequest::SharedListenRequest()
+ //static_assert(std::is_trivially_copyable<Pod>::value, "getPod() used for a POD");
+ getFixed(&pod, sizeof(pod));
+}
+
+template <class Pod>
+void
+Ipc::TypedMsgHdr::putPod(const Pod &pod)
+{
+ // TODO: Enable after fixing Ipc::SharedListenRequest::pack()
+ //static_assert(std::is_trivially_copyable<Pod>::value, "putPod() used for a POD");
+ putFixed(&pod, sizeof(pod));
+}
+
#endif /* SQUID_IPC_TYPED_MSG_HDR_H */
namespace Ipc
{
-class TypedMsgHdr;
-class StrandCoord;
-class StrandMessage;
class Forwarder;
class Inquirer;
+class QuestionerId;
class Request;
+class RequestId;
class Response;
+class StrandCoord;
+class StrandMessage;
+class TypedMsgHdr;
} // namespace Ipc
}
void
-Mgr::Action::sendResponse(unsigned int requestId)
+Mgr::Action::sendResponse(const Ipc::RequestId requestId)
{
Response response(requestId, this);
Ipc::TypedMsgHdr message;
virtual void unpack(const Ipc::TypedMsgHdr &) {}
/// notify Coordinator that this action is done with local processing
- void sendResponse(unsigned int requestId);
+ void sendResponse(Ipc::RequestId);
/* Action properties */
#include "squid.h"
#include "base/TextException.h"
#include "comm/Connection.h"
+#include "ipc/RequestId.h"
#include "mgr/Filler.h"
#include "mgr/Response.h"
#include "Store.h"
CBDATA_NAMESPACED_CLASS_INIT(Mgr, Filler);
Mgr::Filler::Filler(const Action::Pointer &anAction, const Comm::ConnectionPointer &conn,
- unsigned int aRequestId):
+ const Ipc::RequestId aRequestId):
StoreToCommWriter(conn, anAction->createStoreEntry()),
action(anAction),
requestId(aRequestId)
#define SQUID_MGR_FILLER_H
#include "comm/forward.h"
+#include "ipc/forward.h"
#include "mgr/Action.h"
#include "mgr/StoreToCommWriter.h"
CBDATA_CLASS(Filler);
public:
- Filler(const Action::Pointer &anAction, const Comm::ConnectionPointer &conn, unsigned int aRequestId);
+ Filler(const Action::Pointer &, const Comm::ConnectionPointer &, Ipc::RequestId);
protected:
/* AsyncJob API */
private:
Action::Pointer action; ///< action that will run() and sendResponse()
- unsigned int requestId; ///< the ID of the Request we are responding to
+ Ipc::RequestId requestId; ///< the ID of the Request we are responding to
};
} // namespace Mgr
Mgr::Forwarder::Forwarder(const Comm::ConnectionPointer &aConn, const ActionParams &aParams,
HttpRequest* aRequest, StoreEntry* anEntry, const AccessLogEntryPointer &anAle):
- Ipc::Forwarder(new Request(KidIdentifier, 0, aConn, aParams), 10),
+ // TODO: Add virtual Forwarder::makeRequest() to avoid prematurely creating
+ // this dummy request with a dummy ID that are finalized by Ipc::Forwarder.
+ // Same for Snmp::Forwarder.
+ Ipc::Forwarder(new Request(KidIdentifier, Ipc::RequestId(/*XXX*/), aConn, aParams), 10),
httpRequest(aRequest), entry(anEntry), conn(aConn), ale(anAle)
{
debugs(16, 5, HERE << conn);
#include "base/TextException.h"
#include "comm/Connection.h"
#include "globals.h"
+#include "ipc/RequestId.h"
#include "ipc/UdsOp.h"
#include "mgr/Command.h"
#include "mgr/Filler.h"
#include "globals.h"
#include "HttpReply.h"
#include "ipc/Messages.h"
+#include "ipc/RequestId.h"
#include "ipc/TypedMsgHdr.h"
#include "ipc/UdsOp.h"
#include "mgr/Filler.h"
#include "mgr/ActionParams.h"
#include "mgr/Request.h"
-Mgr::Request::Request(int aRequestorId, unsigned int aRequestId, const Comm::ConnectionPointer &aConn,
+Mgr::Request::Request(const int aRequestorId,
+ const Ipc::RequestId aRequestId,
+ const Comm::ConnectionPointer &aConn,
const ActionParams &aParams):
Ipc::Request(aRequestorId, aRequestId),
conn(aConn),
Must(requestorId > 0);
}
-Mgr::Request::Request(const Request& request):
- Ipc::Request(request.requestorId, request.requestId),
- conn(request.conn), params(request.params)
-{
-}
-
-Mgr::Request::Request(const Ipc::TypedMsgHdr& msg):
- Ipc::Request(0, 0)
+Mgr::Request::Request(const Ipc::TypedMsgHdr &msg)
{
msg.checkType(Ipc::mtCacheMgrRequest);
msg.getPod(requestorId);
class Request: public Ipc::Request
{
public:
- Request(int aRequestorId, unsigned int aRequestId, const Comm::ConnectionPointer &aConn,
+ Request(int aRequestorId, Ipc::RequestId, const Comm::ConnectionPointer &aConn,
const ActionParams &aParams);
explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
virtual void pack(Ipc::TypedMsgHdr& msg) const;
virtual Pointer clone() const;
-private:
- Request(const Request& request);
-
public:
Comm::ConnectionPointer conn; ///< HTTP client connection descriptor
#include "base/TextException.h"
#include "CacheManager.h"
#include "ipc/Messages.h"
+#include "ipc/RequestId.h"
#include "ipc/TypedMsgHdr.h"
#include "mgr/ActionCreator.h"
#include "mgr/ActionProfile.h"
#include "mgr/Response.h"
-Mgr::Response::Response(unsigned int aRequestId, Action::Pointer anAction):
+Mgr::Response::Response(const Ipc::RequestId aRequestId, const Action::Pointer anAction):
Ipc::Response(aRequestId), action(anAction)
{
Must(!action || action->name()); // if there is an action, it must be named
}
-Mgr::Response::Response(const Response& response):
- Ipc::Response(response.requestId), action(response.action)
-{
-}
-
-Mgr::Response::Response(const Ipc::TypedMsgHdr& msg):
- Ipc::Response(0)
+Mgr::Response::Response(const Ipc::TypedMsgHdr &msg)
{
msg.checkType(Ipc::mtCacheMgrResponse);
msg.getPod(requestId);
class Response: public Ipc::Response
{
public:
- Response(unsigned int aRequestId, Action::Pointer anAction = NULL);
+ /// sender's constructor
+ Response(Ipc::RequestId, Action::Pointer anAction = nullptr);
explicit Response(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
bool hasAction() const; ///< whether response contain action object
const Action& getAction() const; ///< returns action object
-private:
- Response(const Response& response);
-
public:
Action::Pointer action; ///< action relating to response
};
Snmp::Forwarder::Forwarder(const Pdu& aPdu, const Session& aSession, int aFd,
const Ip::Address& anAddress):
- Ipc::Forwarder(new Request(KidIdentifier, 0, aPdu, aSession, aFd, anAddress), 2),
+ Ipc::Forwarder(new Request(KidIdentifier, Ipc::RequestId(), aPdu, aSession, aFd, anAddress), 2),
fd(aFd)
{
debugs(49, 5, HERE << "FD " << aFd);
}
void
-Snmp::SendResponse(unsigned int requestId, const Pdu& pdu)
+Snmp::SendResponse(const Ipc::RequestId requestId, const Pdu &pdu)
{
debugs(49, 5, HERE);
// snmpAgentResponse() can modify arg
AsyncCall::Pointer closer; ///< comm_close handler for the connection
};
-void SendResponse(unsigned int requestId, const Pdu& pdu);
+void SendResponse(Ipc::RequestId, const Pdu &);
} // namespace Snmp
#include "ipc/TypedMsgHdr.h"
#include "snmp/Request.h"
-Snmp::Request::Request(int aRequestorId, unsigned int aRequestId,
+Snmp::Request::Request(const int aRequestorId, const Ipc::RequestId aRequestId,
const Pdu& aPdu, const Session& aSession,
int aFd, const Ip::Address& anAddress):
Ipc::Request(aRequestorId, aRequestId),
{
}
-Snmp::Request::Request(const Request& request):
- Ipc::Request(request.requestorId, request.requestId),
- pdu(request.pdu), session(request.session),
- fd(request.fd), address(request.address)
-{
-}
-
-Snmp::Request::Request(const Ipc::TypedMsgHdr& msg):
- Ipc::Request(0, 0)
+Snmp::Request::Request(const Ipc::TypedMsgHdr &msg)
{
msg.checkType(Ipc::mtSnmpRequest);
msg.getPod(requestorId);
class Request: public Ipc::Request
{
public:
- Request(int aRequestorId, unsigned int aRequestId, const Pdu& aPdu,
+ Request(int aRequestorId, Ipc::RequestId aRequestId, const Pdu& aPdu,
const Session& aSession, int aFd, const Ip::Address& anAddress);
explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
virtual void pack(Ipc::TypedMsgHdr& msg) const;
virtual Pointer clone() const;
-private:
- Request(const Request& request);
-
public:
Pdu pdu; ///< SNMP protocol data unit
Session session; ///< SNMP session
#include "squid.h"
#include "base/TextException.h"
#include "ipc/Messages.h"
+#include "ipc/RequestId.h"
#include "ipc/TypedMsgHdr.h"
#include "snmp/Response.h"
-std::ostream& Snmp::operator << (std::ostream& os, const Response& response)
-{
- os << "response: {requestId: " << response.requestId << '}';
- return os;
-}
-
-Snmp::Response::Response(unsigned int aRequestId):
+Snmp::Response::Response(const Ipc::RequestId aRequestId):
Ipc::Response(aRequestId), pdu()
{
}
-Snmp::Response::Response(const Response& response):
- Ipc::Response(response.requestId), pdu(response.pdu)
-{
-}
-
-Snmp::Response::Response(const Ipc::TypedMsgHdr& msg):
- Ipc::Response(0)
+Snmp::Response::Response(const Ipc::TypedMsgHdr &msg)
{
msg.checkType(Ipc::mtSnmpResponse);
msg.getPod(requestId);
class Response: public Ipc::Response
{
public:
- Response(unsigned int aRequestId);
+ explicit Response(Ipc::RequestId); ///< sender's constructor
explicit Response(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
/* Ipc::Response API */
virtual void pack(Ipc::TypedMsgHdr& msg) const;
virtual Ipc::Response::Pointer clone() const;
-private:
- Response(const Response& response);
-
public:
Pdu pdu; ///< SNMP protocol data unit
};
-std::ostream& operator << (std::ostream& os, const Response& response);
-
} // namespace Snmp
#endif /* SQUID_SNMPX_RESPONSE_H */
#define STUB_API "lmgr/libmgr.la"
#include "tests/STUB.h"
+#include "ipc/RequestId.h"
+
// NP: used by Command.h instantiations
#include "mgr/ActionProfile.h"
void Mgr::Action::fillEntry(StoreEntry *entry, bool writeHttpHeader) STUB
void Mgr::Action::add(const Action &action) STUB
void Mgr::Action::respond(const Request &request) STUB
-void Mgr::Action::sendResponse(unsigned int requestId) STUB
+void Mgr::Action::sendResponse(const Ipc::RequestId) STUB
bool Mgr::Action::atomic() const STUB_RETVAL(false)
const char * Mgr::Action::name() const STUB_RETVAL(NULL)
static Mgr::Command static_Command;