#include "config.h"
+#include "comm.h"
#include "ipc/Coordinator.h"
+#include "ipc/FdNotes.h"
+#include "ipc/SharedListen.h"
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
handleRegistrationRequest(StrandCoord(message));
break;
+ case mtSharedListenRequest:
+ debugs(54, 6, HERE << "Shared listen request");
+ handleSharedListenRequest(SharedListenRequest(message));
+ break;
+
case mtDescriptorGet:
debugs(54, 6, HERE << "Descriptor get request");
handleDescriptorGet(Descriptor(message));
SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
}
+void
+Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
+{
+ debugs(54, 4, HERE << "kid" << request.requestorId <<
+ " needs shared listen FD for " << request.params.addr);
+ Listeners::const_iterator i = listeners.find(request.params);
+ int errNo = 0;
+ const int sock = (i != listeners.end()) ?
+ i->second : openListenSocket(request, errNo);
+
+ debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
+ request.params.addr << " to kid" << request.requestorId <<
+ " mapId=" << request.mapId);
+
+ SharedListenResponse response(sock, errNo, request.mapId);
+ TypedMsgHdr message;
+ response.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+int
+Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
+ int &errNo)
+{
+ const OpenListenerParams &p = request.params;
+
+ debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
+ request.requestorId);
+
+ IpAddress addr = p.addr; // comm_open_listener may modify it
+
+ enter_suid();
+ const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
+ FdNote(p.fdNote));
+ errNo = (sock >= 0) ? 0 : errno;
+ leave_suid();
+
+ // cache positive results
+ if (sock >= 0)
+ listeners[request.params] = sock;
+
+ return sock;
+}
+
void Ipc::Coordinator::handleDescriptorGet(const Descriptor& request)
{
// XXX: hack: create descriptor here
#include "Array.h"
+#include <map>
#include "ipc/Port.h"
#include "ipc/Messages.h"
+#include "ipc/SharedListen.h"
namespace Ipc
{
void registerStrand(const StrandCoord &); ///< adds or updates existing
void handleRegistrationRequest(const StrandCoord &); ///< register,ACK
+ /// returns cached socket or calls openListenSocket()
+ void handleSharedListenRequest(const SharedListenRequest& request);
void handleDescriptorGet(const Descriptor& request);
+ /// calls comm_open_listener()
+ int openListenSocket(const SharedListenRequest& request, int &errNo);
+
private:
typedef Vector<StrandCoord> Strands; ///< unsorted strands
Strands strands; ///< registered processes and threads
+
+ typedef std::map<OpenListenerParams, int> Listeners; ///< params:fd map
+ Listeners listeners; ///< cached comm_open_listener() results
+
static Coordinator* TheInstance; ///< the only class instance in existence
CBDATA_CLASS2(Coordinator);
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "Debug.h"
+#include "ipc/FdNotes.h"
+
+
+const char *
+Ipc::FdNote(int fdNoteId)
+{
+ static const char *FdNotes[Ipc::fdnEnd] = {
+ "None", // fdnNone
+ "HTTP Socket" // fdnHttpSocket
+ };
+
+ if (fdnNone < fdNoteId && fdNoteId < fdnEnd)
+ return FdNotes[fdNoteId];
+
+ debugs(54, 1, HERE << "salvaged bug: wrong fd_note ID: " << fdNoteId);
+ return FdNotes[fdnNone];
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_FD_NOTES_H
+#define SQUID_IPC_FD_NOTES_H
+
+namespace Ipc
+{
+
+/// We cannot send char* FD notes to other processes. Pass int IDs and convert.
+
+typedef enum { fdnNone, fdnHttpSocket, fdnEnd } FdNoteId; ///< fd_note() label
+
+extern const char *FdNote(int fdNodeId); ///< converts FdNoteId into a string
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_FD_NOTES_H */
noinst_LTLIBRARIES = libipc.la
libipc_la_SOURCES = \
+ FdNotes.cc \
+ FdNotes.h \
Kid.cc \
Kid.h \
Kids.cc \
Kids.h \
Messages.cc \
Messages.h \
+ StartListening.cc \
+ StartListening.h \
+ SharedListen.cc \
+ SharedListen.h \
TypedMsgHdr.cc \
TypedMsgHdr.h \
Coordinator.cc \
class TypedMsgHdr;
typedef enum { mtNone = 0, mtRegistration,
+ mtSharedListenRequest, mtSharedListenResponse,
mtDescriptorGet, mtDescriptorPut } MessageType;
/// Strand location details
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include <map>
+#include "comm.h"
+#include "ipc/Port.h"
+#include "ipc/Messages.h"
+#include "ipc/Kids.h"
+#include "ipc/TypedMsgHdr.h"
+#include "ipc/StartListening.h"
+#include "ipc/SharedListen.h"
+
+
+/// holds information necessary to handle JoinListen response
+class PendingOpenRequest
+{
+public:
+ Ipc::OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
+ AsyncCall::Pointer callback; // who to notify
+};
+
+/// maps ID assigned at request time to the response callback
+typedef std::map<int, PendingOpenRequest> SharedListenRequestMap;
+static SharedListenRequestMap TheSharedListenRequestMap;
+
+static int
+AddToMap(const PendingOpenRequest &por)
+{
+ // find unused ID using linear seach; there should not be many entries
+ for (int id = 0; true; ++id) {
+ if (TheSharedListenRequestMap.find(id) == TheSharedListenRequestMap.end()) {
+ TheSharedListenRequestMap[id] = por;
+ return id;
+ }
+ }
+ assert(false); // not reached
+ return -1;
+}
+
+Ipc::OpenListenerParams::OpenListenerParams()
+{
+ xmemset(this, 0, sizeof(*this));
+}
+
+bool
+Ipc::OpenListenerParams::operator <(const OpenListenerParams &p) const
+{
+ if (sock_type != p.sock_type)
+ return sock_type < p.sock_type;
+
+ if (proto != p.proto)
+ return proto < p.proto;
+
+ // ignore flags and fdNote differences because they do not affect binding
+
+ return addr.compareWhole(p.addr) < 0;
+}
+
+
+
+Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
+{
+ // caller will then set public data members
+}
+
+Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr &hdrMsg)
+{
+ hdrMsg.getData(mtSharedListenRequest, this, sizeof(*this));
+}
+
+void Ipc::SharedListenRequest::pack(TypedMsgHdr &hdrMsg) const
+{
+ hdrMsg.putData(mtSharedListenRequest, this, sizeof(*this));
+}
+
+
+Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId):
+ fd(aFd), errNo(anErrNo), mapId(aMapId)
+{
+}
+
+Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
+ fd(-1), errNo(0), mapId(-1)
+{
+ hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this));
+ fd = hdrMsg.getFd();
+}
+
+void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
+{
+ hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this));
+ hdrMsg.putFd(fd);
+}
+
+
+void Ipc::JoinSharedListen(const OpenListenerParams ¶ms,
+ AsyncCall::Pointer &callback)
+{
+ PendingOpenRequest por;
+ por.params = params;
+ por.callback = callback;
+
+ SharedListenRequest request;
+ request.requestorId = KidIdentifier;
+ request.params = por.params;
+ request.mapId = AddToMap(por);
+
+ debugs(54, 3, HERE << "getting listening FD for " << request.params.addr <<
+ " mapId=" << request.mapId);
+
+ TypedMsgHdr message;
+ request.pack(message);
+ SendMessage(coordinatorAddr, message);
+}
+
+void Ipc::SharedListenJoined(const SharedListenResponse &response)
+{
+ const int fd = response.fd;
+
+ debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" <<
+ response.errNo << " mapId=" << response.mapId);
+
+ Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
+ PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
+ Must(por.callback != NULL);
+ TheSharedListenRequestMap.erase(response.mapId);
+
+ if (fd >= 0) {
+ OpenListenerParams &p = por.params;
+ struct addrinfo *AI = NULL;
+ p.addr.GetAddrInfo(AI);
+ AI->ai_socktype = p.sock_type;
+ AI->ai_protocol = p.proto;
+ comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI);
+ p.addr.FreeAddrInfo(AI);
+ }
+
+ StartListeningCb *cbd =
+ dynamic_cast<StartListeningCb*>(por.callback->getDialer());
+ Must(cbd);
+ cbd->fd = fd;
+ cbd->errNo = response.errNo;
+ ScheduleCallHere(por.callback);
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_SHARED_LISTEN_H
+#define SQUID_IPC_SHARED_LISTEN_H
+
+#include "base/AsyncCall.h"
+
+namespace Ipc
+{
+
+/// "shared listen" is when concurrent processes are listening on the same fd
+
+/// comm_open_listener() parameters holder
+class OpenListenerParams
+{
+public:
+ OpenListenerParams();
+
+ bool operator <(const OpenListenerParams &p) const; ///< useful for map<>
+
+ int sock_type;
+ int proto;
+ IpAddress addr; ///< will be memset and memcopied
+ int flags;
+ int fdNote; ///< index into fd_note() comment strings
+};
+
+class TypedMsgHdr;
+
+/// a request for a listen socket with given parameters
+class SharedListenRequest
+{
+public:
+ SharedListenRequest(); ///< from OpenSharedListen() which then sets public data
+ explicit SharedListenRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+ int requestorId; ///< kidId of the requestor
+
+ OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
+
+ int mapId; ///< to map future response to the requestor's callback
+};
+
+/// a response to SharedListenRequest
+class SharedListenResponse
+{
+public:
+ SharedListenResponse(int fd, int errNo, int mapId);
+ explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+ int fd; ///< opened listening socket or -1
+ int errNo; ///< errno value from comm_open_sharedListen() call
+ int mapId; ///< to map future response to the requestor's callback
+};
+
+/// prepare and send SharedListenRequest to Coordinator
+extern void JoinSharedListen(const OpenListenerParams &, AsyncCall::Pointer &);
+
+/// process Coordinator response to SharedListenRequest
+extern void SharedListenJoined(const SharedListenResponse &response);
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_SHARED_LISTEN_H */
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "comm.h"
+#include "TextException.h"
+#include "ipc/SharedListen.h"
+#include "ipc/StartListening.h"
+
+
+Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0)
+{
+}
+
+Ipc::StartListeningCb::~StartListeningCb()
+{
+}
+
+std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
+{
+ return os << "(FD " << fd << ", err=" << errNo;
+}
+
+
+void Ipc::StartListening(int sock_type, int proto, IpAddress &addr,
+ int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+{
+ OpenListenerParams p;
+ p.sock_type = sock_type;
+ p.proto = proto;
+ p.addr = addr;
+ p.flags = flags;
+ p.fdNote = fdNote;
+
+ if (!opt_no_daemon && Config.main_processes > 1) { // if SMP is on, share
+ Ipc::JoinSharedListen(p, callback);
+ return; // wait for the call back
+ }
+
+ enter_suid();
+ const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
+ FdNote(p.fdNote));
+ const int errNo = (sock >= 0) ? 0 : errno;
+ leave_suid();
+
+ debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
+
+ StartListeningCb *cbd =
+ dynamic_cast<StartListeningCb*>(callback->getDialer());
+ Must(cbd);
+ cbd->fd = sock;
+ cbd->errNo = errNo;
+ ScheduleCallHere(callback);
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_START_LISTENING_H
+#define SQUID_IPC_START_LISTENING_H
+
+#include <iosfwd>
+#include "ipc/FdNotes.h"
+#include "base/AsyncCall.h"
+
+class IpAddress;
+
+namespace Ipc
+{
+
+/// common API for all StartListening() callbacks
+class StartListeningCb
+{
+public:
+ StartListeningCb();
+ virtual ~StartListeningCb();
+
+ /// starts printing arguments, return os
+ std::ostream &startPrint(std::ostream &os) const;
+
+public:
+ int fd; ///< opened listening socket or -1
+ int errNo; ///< errno value from the comm_open_listener() call
+};
+
+/// Depending on whether SMP is on, either ask Coordinator to send us
+/// the listening FD or call comm_open_listener() directly.
+extern void StartListening(int sock_type, int proto, IpAddress &addr,
+ int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_START_LISTENING_H */
#include "config.h"
#include "ipc/Strand.h"
#include "ipc/Messages.h"
+#include "ipc/SharedListen.h"
#include "ipc/Kids.h"
handleRegistrationResponse(StrandCoord(message));
break;
+ case mtSharedListenResponse:
+ SharedListenJoined(SharedListenResponse(message));
+ break;
+
case mtDescriptorPut:
putDescriptor(Descriptor(message));
break;