From 1bac0258ae64bb1687d4efba9669c478d853ff76 Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Thu, 29 Apr 2010 14:12:03 -0600 Subject: [PATCH] Switched from sendto/recvfrom to sendmsg/recvmsg for UDS I/O. Replaced inlined sendto/recvfrom hacks with FD_WRITE/READ_METHOD-based code. A common msghdr-based interface allows us to use the same API for regular IPC messages and for future messages that pass socket descriptors. While msghdr allows for complex vector-based scatter/gather I/O, the IPC code limits complexity by using a single-element I/O vector and a control message part dedicated to passing descriptors. Added a temporary hack to block-sleep between IPC message sending attempts so that we do not use up all the allowed attempts in a short period of time. The hack will be replace with a non-blocking addEvent-based sleep. --- src/comm.cc | 10 ++----- src/enums.h | 1 + src/fd.cc | 35 ++++++++++++++++++++++-- src/fde.h | 1 - src/ipc/Coordinator.cc | 24 +++++++++-------- src/ipc/Coordinator.h | 13 ++++----- src/ipc/Makefile.am | 4 +++ src/ipc/Messages.cc | 31 +++++++++++++++++++++ src/ipc/Messages.h | 53 ++++++++++++++++++++++++++++++++++++ src/ipc/Port.cc | 5 ++-- src/ipc/Port.h | 4 +-- src/ipc/Strand.cc | 12 +++++---- src/ipc/Strand.h | 5 ++-- src/ipc/UdsOp.cc | 61 +++++++++++++++++------------------------- src/ipc/UdsOp.h | 57 +++++++++++---------------------------- 15 files changed, 198 insertions(+), 118 deletions(-) create mode 100644 src/ipc/Messages.cc create mode 100644 src/ipc/Messages.h diff --git a/src/comm.cc b/src/comm.cc index f2dd988152..8ce4c9a8e9 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -1883,11 +1883,7 @@ commHandleWrite(int fd, void *data) (long int) state->offset << ", sz " << (long int) state->size << "."); nleft = state->size - state->offset; - // XXX: tmp hack: call sendto() for unconnected UDS sockets - if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL) - len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr)); - else - len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); + len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); debugs(5, 5, "commHandleWrite: write() returns " << len); fd_bytes(fd, len, FD_WRITE); statCounter.syscalls.sock.writes++; @@ -2464,14 +2460,12 @@ comm_open_uds(int sock_type, debugs(50, 5, HERE << "FD " << new_socket << " is a new socket"); assert(!isOpen(new_socket)); - fd_open(new_socket, FD_SOCKET, NULL); + fd_open(new_socket, FD_MSGHDR, NULL); fdd_table[new_socket].close_file = NULL; fdd_table[new_socket].close_line = 0; - fd_table[new_socket].unix_addr = *addr; - fd_table[new_socket].sock_family = AI.ai_family; if (!(flags & COMM_NOCLOEXEC)) diff --git a/src/enums.h b/src/enums.h index d7aa6f8b0a..dcdf542d5b 100644 --- a/src/enums.h +++ b/src/enums.h @@ -69,6 +69,7 @@ enum fd_type { FD_FILE, FD_SOCKET, FD_PIPE, + FD_MSGHDR, FD_UNKNOWN }; diff --git a/src/fd.cc b/src/fd.cc index 144d36b720..a22b38b7cd 100644 --- a/src/fd.cc +++ b/src/fd.cc @@ -45,6 +45,9 @@ int socket_read_method(int, char *, int); int socket_write_method(int, const char *, int); int file_read_method(int, char *, int); int file_write_method(int, const char *, int); +#else +int msghdr_read_method(int, char *, int); +int msghdr_write_method(int, const char *, int); #endif const char *fdTypeStr[] = { @@ -53,6 +56,7 @@ const char *fdTypeStr[] = { "File", "Socket", "Pipe", + "MsgHdr", "Unknown" }; @@ -169,6 +173,24 @@ default_write_method(int fd, const char *buf, int len) return i; } +int +msghdr_read_method(int fd, char *buf, int len) +{ + PROF_start(read); + const int i = recvmsg(fd, reinterpret_cast(buf), MSG_DONTWAIT); + PROF_stop(read); + return i; +} + +int +msghdr_write_method(int fd, const char *buf, int len) +{ + PROF_start(write); + const int i = sendmsg(fd, reinterpret_cast(buf), MSG_NOSIGNAL); + PROF_stop(write); + return i > 0 ? len : i; // len is imprecise but the caller expects a match +} + #endif void @@ -213,9 +235,18 @@ fd_open(int fd, unsigned int type, const char *desc) } #else - F->read_method = &default_read_method; + switch (type) { - F->write_method = &default_write_method; + case FD_MSGHDR: + F->read_method = &msghdr_read_method; + F->write_method = &msghdr_write_method; + break; + + default: + F->read_method = &default_read_method; + F->write_method = &default_write_method; + break; + } #endif diff --git a/src/fde.h b/src/fde.h index 6a7365c041..a0b903d1a3 100644 --- a/src/fde.h +++ b/src/fde.h @@ -60,7 +60,6 @@ public: int sock_family; char ipaddr[MAX_IPSTRLEN]; /* dotted decimal address of peer */ char desc[FD_DESC_SZ]; - struct sockaddr_un unix_addr; ///< unix domain socket (UDS) address, if any struct { unsigned int open:1; diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc index 6976b1c5ab..02f91a70c3 100644 --- a/src/ipc/Coordinator.cc +++ b/src/ipc/Coordinator.cc @@ -24,29 +24,30 @@ void Ipc::Coordinator::start() Port::start(); } -Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId) +Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId) { - for (Vector::iterator iter = strands.begin(); iter != strands.end(); ++iter) { + typedef Strands::iterator SI; + for (SI iter = strands.begin(); iter != strands.end(); ++iter) { if (iter->kidId == kidId) return &(*iter); } return NULL; } -void Ipc::Coordinator::registerStrand(const StrandData& strand) +void Ipc::Coordinator::registerStrand(const StrandCoord& strand) { - if (StrandData* found = findStrand(strand.kidId)) + if (StrandCoord* found = findStrand(strand.kidId)) *found = strand; else strands.push_back(strand); } -void Ipc::Coordinator::receive(const Message& message) +void Ipc::Coordinator::receive(const TypedMsgHdr& message) { switch (message.type()) { case mtRegistration: debugs(54, 6, HERE << "Registration request"); - handleRegistrationRequest(message.strand()); + handleRegistrationRequest(StrandCoord(message)); break; default: @@ -55,19 +56,20 @@ void Ipc::Coordinator::receive(const Message& message) } } -void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand) +void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand) { registerStrand(strand); // send back an acknowledgement; TODO: remove as not needed? - SendMessage(MakeAddr(strandAddrPfx, strand.kidId), - Message(mtRegistration, strand.kidId, strand.pid)); + TypedMsgHdr message; + strand.pack(message); + SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message); } void Ipc::Coordinator::broadcastSignal(int sig) const { - typedef Vector::const_iterator VSDCI; - for (VSDCI iter = strands.begin(); iter != strands.end(); ++iter) { + typedef Strands::const_iterator SCI; + for (SCI iter = strands.begin(); iter != strands.end(); ++iter) { debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId << ", PID=" << iter->pid); kill(iter->pid, sig); diff --git a/src/ipc/Coordinator.h b/src/ipc/Coordinator.h index f2fd75b71a..1d81a4ed7e 100644 --- a/src/ipc/Coordinator.h +++ b/src/ipc/Coordinator.h @@ -11,7 +11,7 @@ #include "Array.h" #include "ipc/Port.h" - +#include "ipc/Messages.h" namespace Ipc { @@ -29,14 +29,15 @@ public: protected: virtual void start(); // Port (AsyncJob) API - virtual void receive(const Message& message); // Port API + virtual void receive(const TypedMsgHdr& message); // Port API - StrandData* findStrand(int kidId); ///< registered strand or NULL - void registerStrand(const StrandData &); ///< adds or updates existing - void handleRegistrationRequest(const StrandData &); ///< registers and ACKs + StrandCoord* findStrand(int kidId); ///< registered strand or NULL + void registerStrand(const StrandCoord &); ///< adds or updates existing + void handleRegistrationRequest(const StrandCoord &); ///< register,ACK private: - Vector strands; ///< registered processes and threads + typedef Vector Strands; ///< unsorted strands + Strands strands; ///< registered processes and threads static Coordinator* TheInstance; ///< the only class instance in existence CBDATA_CLASS2(Coordinator); diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index 6e18342cfc..3c901415c9 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -8,6 +8,10 @@ libipc_la_SOURCES = \ Kid.h \ Kids.cc \ Kids.h \ + Messages.cc \ + Messages.h \ + TypedMsgHdr.cc \ + TypedMsgHdr.h \ Coordinator.cc \ Coordinator.h \ UdsOp.cc \ diff --git a/src/ipc/Messages.cc b/src/ipc/Messages.cc new file mode 100644 index 0000000000..7060a940f4 --- /dev/null +++ b/src/ipc/Messages.cc @@ -0,0 +1,31 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + +#include "config.h" +#include "comm.h" +#include "ipc/Messages.h" +#include "ipc/TypedMsgHdr.h" + + +Ipc::StrandCoord::StrandCoord(): kidId(-1), pid(0) +{ +} + +Ipc::StrandCoord::StrandCoord(int aKidId, pid_t aPid): kidId(aKidId), pid(aPid) +{ +} + +Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0) +{ + hdrMsg.getData(mtRegistration, this, sizeof(this)); +} + +void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const +{ + hdrMsg.putData(mtRegistration, this, sizeof(this)); +} diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h new file mode 100644 index 0000000000..d7b1aae232 --- /dev/null +++ b/src/ipc/Messages.h @@ -0,0 +1,53 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_MESSAGES_H +#define SQUID_IPC_MESSAGES_H + +#include +#include + +/// Declare IPC messages. These classes translate between high-level +/// information and low-level TypedMsgHdr (i.e., struct msghdr) buffers. + +namespace Ipc +{ + +class TypedMsgHdr; + +typedef enum { mtNone = 0, mtRegistration, mtDescriptor } MessageType; + +/// Strand location details +class StrandCoord { +public: + StrandCoord(); ///< unknown location + StrandCoord(int akidId, pid_t aPid); ///< from registrant + explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg() + void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + +public: + int kidId; ///< internal Squid process number + pid_t pid; ///< OS process or thread identifier +}; + +/// a [socket] descriptor information +class Descriptor +{ +public: + explicit Descriptor(int fd); ///< from descriptor sender + explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg() + void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + +public: + int fd; ///< raw descriptor value +}; + + +} // namespace Ipc; + + +#endif /* SQUID_IPC_MESSAGES_H */ diff --git a/src/ipc/Port.cc b/src/ipc/Port.cc index 5a01a700e6..8d525005cb 100644 --- a/src/ipc/Port.cc +++ b/src/ipc/Port.cc @@ -7,6 +7,7 @@ #include "config.h" +#include "CommCalls.h" #include "ipc/Port.h" const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc"; @@ -17,6 +18,7 @@ Ipc::Port::Port(const String& aListenAddr): UdsOp(aListenAddr) { setOptions(COMM_NONBLOCKING | COMM_DOBIND); + buf.allocate(); } void Ipc::Port::start() @@ -54,11 +56,10 @@ void Ipc::Port::noteRead(const CommIoCbParams& params) " [" << this << ']'); if (params.flag == COMM_OK) { assert(params.buf == buf.raw()); - assert(params.size == buf.size()); receive(buf); } // TODO: if there was a fatal error on our socket, close the socket before - // trying to listen again and print a level-1 error message. + // trying to listen again and print a level-1 error message. listen(); } diff --git a/src/ipc/Port.h b/src/ipc/Port.h index 8c87280cc2..ffe791ad3c 100644 --- a/src/ipc/Port.h +++ b/src/ipc/Port.h @@ -34,13 +34,13 @@ protected: void listen(); /// handle IPC message just read - virtual void receive(const Message& message) = 0; + virtual void receive(const TypedMsgHdr& message) = 0; private: void noteRead(const CommIoCbParams ¶ms); // Comm callback API private: - Message buf; ///< UDS read buffer filled by Comm + TypedMsgHdr buf; ///< msghdr struct filled by Comm }; diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index a220d3bf5e..0c8f59ebce 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -7,6 +7,7 @@ #include "config.h" #include "ipc/Strand.h" +#include "ipc/Messages.h" #include "ipc/Kids.h" @@ -29,18 +30,19 @@ void Ipc::Strand::registerSelf() { debugs(54, 6, HERE); Must(!isRegistered); - SendMessage(coordinatorAddr, - Message(mtRegistration, KidIdentifier, getpid())); + TypedMsgHdr message; + StrandCoord(KidIdentifier, getpid()).pack(message); + SendMessage(coordinatorAddr, message); setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable? } -void Ipc::Strand::receive(const Message& message) +void Ipc::Strand::receive(const TypedMsgHdr &message) { debugs(54, 6, HERE); switch (message.type()) { case mtRegistration: - handleRegistrationResponse(message.strand()); + handleRegistrationResponse(StrandCoord(message)); break; default: @@ -49,7 +51,7 @@ void Ipc::Strand::receive(const Message& message) } } -void Ipc::Strand::handleRegistrationResponse(const StrandData& strand) +void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand) { // handle registration response from the coordinator; it could be stale if (strand.kidId == KidIdentifier && strand.pid == getpid()) { diff --git a/src/ipc/Strand.h b/src/ipc/Strand.h index da207d005c..5a1e26e7b4 100644 --- a/src/ipc/Strand.h +++ b/src/ipc/Strand.h @@ -14,6 +14,7 @@ namespace Ipc { +class StrandCoord; /// Receives coordination messages on behalf of its process or thread class Strand: public Port @@ -25,11 +26,11 @@ public: protected: virtual void timedout(); // Port (UsdOp) API - virtual void receive(const Message& message); // Port API + virtual void receive(const TypedMsgHdr &message); // Port API private: void registerSelf(); /// let Coordinator know this strand exists - void handleRegistrationResponse(const StrandData& strand); + void handleRegistrationResponse(const StrandCoord &strand); private: bool isRegistered; ///< whether Coordinator ACKed registration (unused) diff --git a/src/ipc/UdsOp.cc b/src/ipc/UdsOp.cc index 5f7f7703c3..9f6c005678 100644 --- a/src/ipc/UdsOp.cc +++ b/src/ipc/UdsOp.cc @@ -8,31 +8,13 @@ #include "config.h" #include "comm.h" +#include "CommCalls.h" #include "ipc/UdsOp.h" -Ipc::Message::Message() -{ - data.messageType = mtNone; - data.strand.kidId = -1; -} - -Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid) -{ - data.messageType = messageType; - data.strand.kidId = kidId; - data.strand.pid = pid; -} - -const Ipc::StrandData &Ipc::Message::strand() const -{ - Must(data.messageType == mtRegistration); - return data.strand; -} - Ipc::UdsOp::UdsOp(const String& pathAddr): AsyncJob("Ipc::UdsOp"), - addr(setAddr(pathAddr)), + address(PathToAddress(pathAddr)), options(COMM_NONBLOCKING), fd_(-1) { @@ -55,23 +37,13 @@ int Ipc::UdsOp::fd() { if (fd_ < 0) { if (options & COMM_DOBIND) - unlink(addr.sun_path); - fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options); + unlink(address.sun_path); + fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options); Must(fd_ >= 0); } return fd_; } -struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr) -{ - assert(pathAddr.size() != 0); - struct sockaddr_un unixAddr; - memset(&unixAddr, 0, sizeof(unixAddr)); - unixAddr.sun_family = AF_LOCAL; - xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path)); - return unixAddr; -} - void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName) { AsyncCall::Pointer handler = asyncCall(54,5, handlerName, @@ -91,15 +63,28 @@ void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &) } +struct sockaddr_un +Ipc::PathToAddress(const String& pathAddr) +{ + assert(pathAddr.size() != 0); + struct sockaddr_un unixAddr; + memset(&unixAddr, 0, sizeof(unixAddr)); + unixAddr.sun_family = AF_LOCAL; + xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path)); + return unixAddr; +} + + CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender); -Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage): +Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage): UdsOp(pathAddr), message(aMessage), - retries(4), // TODO: make configurable? - timeout(5), // TODO: make configurable? + retries(10), // TODO: make configurable? + timeout(10), // TODO: make configurable? writing(false) { + message.address(address); } void Ipc::UdsSender::start() @@ -128,8 +113,10 @@ void Ipc::UdsSender::wrote(const CommIoCbParams& params) { debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']'); writing = false; - if (params.flag != COMM_OK && retries-- > 0) + if (params.flag != COMM_OK && retries-- > 0) { + sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed? write(); // XXX: should we close on error so that fd() reopens? + } } void Ipc::UdsSender::timedout() @@ -139,7 +126,7 @@ void Ipc::UdsSender::timedout() } -void Ipc::SendMessage(const String& toAddress, const Message& message) +void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message) { AsyncJob::AsyncStart(new UdsSender(toAddress, message)); } diff --git a/src/ipc/UdsOp.h b/src/ipc/UdsOp.h index 7801f25b98..32b58da232 100644 --- a/src/ipc/UdsOp.h +++ b/src/ipc/UdsOp.h @@ -10,46 +10,15 @@ #include "SquidString.h" -#include "CommCalls.h" +#include "base/AsyncJob.h" +#include "ipc/TypedMsgHdr.h" +class CommTimeoutCbParams; +class CommIoCbParams; namespace Ipc { - -typedef enum { mtNone = 0, mtRegistration } MessageType; - -/// Strand registration information -struct StrandData -{ - int kidId; - pid_t pid; -}; - -/// information sent or received during IPC -class Message -{ -public: - Message(); - Message(MessageType messageType, int kidId, pid_t pid); - - /// raw, type-independent access - int type() const { return data.messageType; } - char *raw() { return reinterpret_cast(&data); } - const char *raw() const { return reinterpret_cast(&data); } - size_t size() const { return sizeof(data); } - - /// type-dependent access - const StrandData& strand() const; - -private: - struct Data { - int messageType; - StrandData strand; - // TODO: redesign to better handle many type-specific datas like strand - } data; ///< everything being sent or received -}; - /// code shared by unix-domain socket senders (e.g., UdsSender or Coordinator) /// and receivers (e.g. Port or Coordinator) class UdsOp: public AsyncJob @@ -58,6 +27,9 @@ public: UdsOp(const String &pathAddr); virtual ~UdsOp(); +public: + struct sockaddr_un address; ///< UDS address from path; treat as read-only + protected: virtual void timedout() {} ///< called after setTimeout() if timed out @@ -73,11 +45,7 @@ private: /// Comm timeout callback; calls timedout() void noteTimeout(const CommTimeoutCbParams &p); - /// configures addr member - struct sockaddr_un setAddr(const String &pathAddr); - private: - struct sockaddr_un addr; ///< UDS address int options; ///< UDS options int fd_; ///< UDS descriptor @@ -86,12 +54,17 @@ private: UdsOp &operator= (const UdsOp &); // not implemented }; +/// converts human-readable filename path into UDS address +extern struct sockaddr_un PathToAddress(const String &pathAddr); + + + // XXX: move UdsSender code to UdsSender.{cc,h} /// attempts to send an IPC message a few times, with a timeout class UdsSender: public UdsOp { public: - UdsSender(const String& pathAddr, const Message& aMessage); + UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage); protected: virtual void start(); // UdsOp (AsyncJob) API @@ -103,7 +76,7 @@ private: void wrote(const CommIoCbParams& params); ///< done writing or error private: - Message message; ///< what to send + TypedMsgHdr message; ///< what to send int retries; ///< how many times to try after a write error int timeout; ///< total time to send the message bool writing; ///< whether Comm started and did not finish writing @@ -116,7 +89,7 @@ private: }; -void SendMessage(const String& toAddress, const Message& message); +void SendMessage(const String& toAddress, const TypedMsgHdr& message); } -- 2.47.3