From: Alex Rousskov Date: Sun, 2 May 2010 18:49:25 +0000 (-0600) Subject: Support a "shared listen" concept when multiple concurrent processes listen X-Git-Tag: SQUID_3_2_0_1~93^2~17 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0d0bce6a6804adc0baea2b016e63337ab97451fb;p=thirdparty%2Fsquid.git Support a "shared listen" concept when multiple concurrent processes listen on the same socket. The Coordinator is responsible for opening and caching listening sockets, using comm_open_listener() parameters supplied by remote callers. Sendmsg/recvmsg is used to shovel socket descriptors from Coordinator to remote callers. If SMP is not enabled, we call the local comm_open_listener() as usual but return the results asynchronously to avoid making SMP/nonSMP special in the caller code. --- diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc index a232a83b5f..f1d414c80a 100644 --- a/src/ipc/Coordinator.cc +++ b/src/ipc/Coordinator.cc @@ -7,7 +7,10 @@ #include "config.h" +#include "comm.h" #include "ipc/Coordinator.h" +#include "ipc/FdNotes.h" +#include "ipc/SharedListen.h" CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); @@ -50,6 +53,11 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message) handleRegistrationRequest(StrandCoord(message)); break; + case mtSharedListenRequest: + debugs(54, 6, HERE << "Shared listen request"); + handleSharedListenRequest(SharedListenRequest(message)); + break; + case mtDescriptorGet: debugs(54, 6, HERE << "Descriptor get request"); handleDescriptorGet(Descriptor(message)); @@ -71,6 +79,50 @@ void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand) SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message); } +void +Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request) +{ + debugs(54, 4, HERE << "kid" << request.requestorId << + " needs shared listen FD for " << request.params.addr); + Listeners::const_iterator i = listeners.find(request.params); + int errNo = 0; + const int sock = (i != listeners.end()) ? + i->second : openListenSocket(request, errNo); + + debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " << + request.params.addr << " to kid" << request.requestorId << + " mapId=" << request.mapId); + + SharedListenResponse response(sock, errNo, request.mapId); + TypedMsgHdr message; + response.pack(message); + SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); +} + +int +Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, + int &errNo) +{ + const OpenListenerParams &p = request.params; + + debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" << + request.requestorId); + + IpAddress addr = p.addr; // comm_open_listener may modify it + + enter_suid(); + const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags, + FdNote(p.fdNote)); + errNo = (sock >= 0) ? 0 : errno; + leave_suid(); + + // cache positive results + if (sock >= 0) + listeners[request.params] = sock; + + return sock; +} + void Ipc::Coordinator::handleDescriptorGet(const Descriptor& request) { // XXX: hack: create descriptor here diff --git a/src/ipc/Coordinator.h b/src/ipc/Coordinator.h index 80e150a768..6de7394202 100644 --- a/src/ipc/Coordinator.h +++ b/src/ipc/Coordinator.h @@ -10,8 +10,10 @@ #include "Array.h" +#include #include "ipc/Port.h" #include "ipc/Messages.h" +#include "ipc/SharedListen.h" namespace Ipc { @@ -35,11 +37,20 @@ protected: void registerStrand(const StrandCoord &); ///< adds or updates existing void handleRegistrationRequest(const StrandCoord &); ///< register,ACK + /// returns cached socket or calls openListenSocket() + void handleSharedListenRequest(const SharedListenRequest& request); void handleDescriptorGet(const Descriptor& request); + /// calls comm_open_listener() + int openListenSocket(const SharedListenRequest& request, int &errNo); + private: typedef Vector Strands; ///< unsorted strands Strands strands; ///< registered processes and threads + + typedef std::map Listeners; ///< params:fd map + Listeners listeners; ///< cached comm_open_listener() results + static Coordinator* TheInstance; ///< the only class instance in existence CBDATA_CLASS2(Coordinator); diff --git a/src/ipc/FdNotes.cc b/src/ipc/FdNotes.cc new file mode 100644 index 0000000000..1ab4cd3265 --- /dev/null +++ b/src/ipc/FdNotes.cc @@ -0,0 +1,26 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "Debug.h" +#include "ipc/FdNotes.h" + + +const char * +Ipc::FdNote(int fdNoteId) +{ + static const char *FdNotes[Ipc::fdnEnd] = { + "None", // fdnNone + "HTTP Socket" // fdnHttpSocket + }; + + if (fdnNone < fdNoteId && fdNoteId < fdnEnd) + return FdNotes[fdNoteId]; + + debugs(54, 1, HERE << "salvaged bug: wrong fd_note ID: " << fdNoteId); + return FdNotes[fdnNone]; +} diff --git a/src/ipc/FdNotes.h b/src/ipc/FdNotes.h new file mode 100644 index 0000000000..8fa5cb72d1 --- /dev/null +++ b/src/ipc/FdNotes.h @@ -0,0 +1,23 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_FD_NOTES_H +#define SQUID_IPC_FD_NOTES_H + +namespace Ipc +{ + +/// We cannot send char* FD notes to other processes. Pass int IDs and convert. + +typedef enum { fdnNone, fdnHttpSocket, fdnEnd } FdNoteId; ///< fd_note() label + +extern const char *FdNote(int fdNodeId); ///< converts FdNoteId into a string + +} // namespace Ipc; + + +#endif /* SQUID_IPC_FD_NOTES_H */ diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index 3c901415c9..c81662c482 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -4,12 +4,18 @@ include $(top_srcdir)/src/TestHeaders.am noinst_LTLIBRARIES = libipc.la libipc_la_SOURCES = \ + FdNotes.cc \ + FdNotes.h \ Kid.cc \ Kid.h \ Kids.cc \ Kids.h \ Messages.cc \ Messages.h \ + StartListening.cc \ + StartListening.h \ + SharedListen.cc \ + SharedListen.h \ TypedMsgHdr.cc \ TypedMsgHdr.h \ Coordinator.cc \ diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h index d545b21b74..31cb6cb165 100644 --- a/src/ipc/Messages.h +++ b/src/ipc/Messages.h @@ -20,6 +20,7 @@ namespace Ipc class TypedMsgHdr; typedef enum { mtNone = 0, mtRegistration, + mtSharedListenRequest, mtSharedListenResponse, mtDescriptorGet, mtDescriptorPut } MessageType; /// Strand location details diff --git a/src/ipc/SharedListen.cc b/src/ipc/SharedListen.cc new file mode 100644 index 0000000000..9e46fcb6e5 --- /dev/null +++ b/src/ipc/SharedListen.cc @@ -0,0 +1,149 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include +#include "comm.h" +#include "ipc/Port.h" +#include "ipc/Messages.h" +#include "ipc/Kids.h" +#include "ipc/TypedMsgHdr.h" +#include "ipc/StartListening.h" +#include "ipc/SharedListen.h" + + +/// holds information necessary to handle JoinListen response +class PendingOpenRequest +{ +public: + Ipc::OpenListenerParams params; ///< actual comm_open_sharedListen() parameters + AsyncCall::Pointer callback; // who to notify +}; + +/// maps ID assigned at request time to the response callback +typedef std::map SharedListenRequestMap; +static SharedListenRequestMap TheSharedListenRequestMap; + +static int +AddToMap(const PendingOpenRequest &por) +{ + // find unused ID using linear seach; there should not be many entries + for (int id = 0; true; ++id) { + if (TheSharedListenRequestMap.find(id) == TheSharedListenRequestMap.end()) { + TheSharedListenRequestMap[id] = por; + return id; + } + } + assert(false); // not reached + return -1; +} + +Ipc::OpenListenerParams::OpenListenerParams() +{ + xmemset(this, 0, sizeof(*this)); +} + +bool +Ipc::OpenListenerParams::operator <(const OpenListenerParams &p) const +{ + if (sock_type != p.sock_type) + return sock_type < p.sock_type; + + if (proto != p.proto) + return proto < p.proto; + + // ignore flags and fdNote differences because they do not affect binding + + return addr.compareWhole(p.addr) < 0; +} + + + +Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1) +{ + // caller will then set public data members +} + +Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr &hdrMsg) +{ + hdrMsg.getData(mtSharedListenRequest, this, sizeof(*this)); +} + +void Ipc::SharedListenRequest::pack(TypedMsgHdr &hdrMsg) const +{ + hdrMsg.putData(mtSharedListenRequest, this, sizeof(*this)); +} + + +Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId): + fd(aFd), errNo(anErrNo), mapId(aMapId) +{ +} + +Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg): + fd(-1), errNo(0), mapId(-1) +{ + hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this)); + fd = hdrMsg.getFd(); +} + +void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const +{ + hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this)); + hdrMsg.putFd(fd); +} + + +void Ipc::JoinSharedListen(const OpenListenerParams ¶ms, + AsyncCall::Pointer &callback) +{ + PendingOpenRequest por; + por.params = params; + por.callback = callback; + + SharedListenRequest request; + request.requestorId = KidIdentifier; + request.params = por.params; + request.mapId = AddToMap(por); + + debugs(54, 3, HERE << "getting listening FD for " << request.params.addr << + " mapId=" << request.mapId); + + TypedMsgHdr message; + request.pack(message); + SendMessage(coordinatorAddr, message); +} + +void Ipc::SharedListenJoined(const SharedListenResponse &response) +{ + const int fd = response.fd; + + debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" << + response.errNo << " mapId=" << response.mapId); + + Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end()); + PendingOpenRequest por = TheSharedListenRequestMap[response.mapId]; + Must(por.callback != NULL); + TheSharedListenRequestMap.erase(response.mapId); + + if (fd >= 0) { + OpenListenerParams &p = por.params; + struct addrinfo *AI = NULL; + p.addr.GetAddrInfo(AI); + AI->ai_socktype = p.sock_type; + AI->ai_protocol = p.proto; + comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI); + p.addr.FreeAddrInfo(AI); + } + + StartListeningCb *cbd = + dynamic_cast(por.callback->getDialer()); + Must(cbd); + cbd->fd = fd; + cbd->errNo = response.errNo; + ScheduleCallHere(por.callback); +} diff --git a/src/ipc/SharedListen.h b/src/ipc/SharedListen.h new file mode 100644 index 0000000000..3b77b88a5f --- /dev/null +++ b/src/ipc/SharedListen.h @@ -0,0 +1,74 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_SHARED_LISTEN_H +#define SQUID_IPC_SHARED_LISTEN_H + +#include "base/AsyncCall.h" + +namespace Ipc +{ + +/// "shared listen" is when concurrent processes are listening on the same fd + +/// comm_open_listener() parameters holder +class OpenListenerParams +{ +public: + OpenListenerParams(); + + bool operator <(const OpenListenerParams &p) const; ///< useful for map<> + + int sock_type; + int proto; + IpAddress addr; ///< will be memset and memcopied + int flags; + int fdNote; ///< index into fd_note() comment strings +}; + +class TypedMsgHdr; + +/// a request for a listen socket with given parameters +class SharedListenRequest +{ +public: + SharedListenRequest(); ///< from OpenSharedListen() which then sets public data + explicit SharedListenRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg() + void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + +public: + int requestorId; ///< kidId of the requestor + + OpenListenerParams params; ///< actual comm_open_sharedListen() parameters + + int mapId; ///< to map future response to the requestor's callback +}; + +/// a response to SharedListenRequest +class SharedListenResponse +{ +public: + SharedListenResponse(int fd, int errNo, int mapId); + explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg() + void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + +public: + int fd; ///< opened listening socket or -1 + int errNo; ///< errno value from comm_open_sharedListen() call + int mapId; ///< to map future response to the requestor's callback +}; + +/// prepare and send SharedListenRequest to Coordinator +extern void JoinSharedListen(const OpenListenerParams &, AsyncCall::Pointer &); + +/// process Coordinator response to SharedListenRequest +extern void SharedListenJoined(const SharedListenResponse &response); + +} // namespace Ipc; + + +#endif /* SQUID_IPC_SHARED_LISTEN_H */ diff --git a/src/ipc/StartListening.cc b/src/ipc/StartListening.cc new file mode 100644 index 0000000000..764be9e6cc --- /dev/null +++ b/src/ipc/StartListening.cc @@ -0,0 +1,58 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "comm.h" +#include "TextException.h" +#include "ipc/SharedListen.h" +#include "ipc/StartListening.h" + + +Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0) +{ +} + +Ipc::StartListeningCb::~StartListeningCb() +{ +} + +std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const +{ + return os << "(FD " << fd << ", err=" << errNo; +} + + +void Ipc::StartListening(int sock_type, int proto, IpAddress &addr, + int flags, FdNoteId fdNote, AsyncCall::Pointer &callback) +{ + OpenListenerParams p; + p.sock_type = sock_type; + p.proto = proto; + p.addr = addr; + p.flags = flags; + p.fdNote = fdNote; + + if (!opt_no_daemon && Config.main_processes > 1) { // if SMP is on, share + Ipc::JoinSharedListen(p, callback); + return; // wait for the call back + } + + enter_suid(); + const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags, + FdNote(p.fdNote)); + const int errNo = (sock >= 0) ? 0 : errno; + leave_suid(); + + debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr); + + StartListeningCb *cbd = + dynamic_cast(callback->getDialer()); + Must(cbd); + cbd->fd = sock; + cbd->errNo = errNo; + ScheduleCallHere(callback); +} diff --git a/src/ipc/StartListening.h b/src/ipc/StartListening.h new file mode 100644 index 0000000000..9bf8e44f11 --- /dev/null +++ b/src/ipc/StartListening.h @@ -0,0 +1,43 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_START_LISTENING_H +#define SQUID_IPC_START_LISTENING_H + +#include +#include "ipc/FdNotes.h" +#include "base/AsyncCall.h" + +class IpAddress; + +namespace Ipc +{ + +/// common API for all StartListening() callbacks +class StartListeningCb +{ +public: + StartListeningCb(); + virtual ~StartListeningCb(); + + /// starts printing arguments, return os + std::ostream &startPrint(std::ostream &os) const; + +public: + int fd; ///< opened listening socket or -1 + int errNo; ///< errno value from the comm_open_listener() call +}; + +/// Depending on whether SMP is on, either ask Coordinator to send us +/// the listening FD or call comm_open_listener() directly. +extern void StartListening(int sock_type, int proto, IpAddress &addr, + int flags, FdNoteId fdNote, AsyncCall::Pointer &callback); + +} // namespace Ipc; + + +#endif /* SQUID_IPC_START_LISTENING_H */ diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index 3bf75c7dfa..c8f2c4119c 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -8,6 +8,7 @@ #include "config.h" #include "ipc/Strand.h" #include "ipc/Messages.h" +#include "ipc/SharedListen.h" #include "ipc/Kids.h" @@ -45,6 +46,10 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) handleRegistrationResponse(StrandCoord(message)); break; + case mtSharedListenResponse: + SharedListenJoined(SharedListenResponse(message)); + break; + case mtDescriptorPut: putDescriptor(Descriptor(message)); break;