inlined sendto/recvfrom hacks with FD_WRITE/READ_METHOD-based code.
A common msghdr-based interface allows us to use the same API for regular
IPC messages and for future messages that pass socket descriptors. While
msghdr allows for complex vector-based scatter/gather I/O, the IPC code
limits complexity by using a single-element I/O vector and a control message
part dedicated to passing descriptors.
Added a temporary hack to block-sleep between IPC message sending attempts so
that we do not use up all the allowed attempts in a short period of time. The
hack will be replace with a non-blocking addEvent-based sleep.
(long int) state->offset << ", sz " << (long int) state->size << ".");
nleft = state->size - state->offset;
- // XXX: tmp hack: call sendto() for unconnected UDS sockets
- if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL)
- len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr));
- else
- len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+ len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
debugs(5, 5, "commHandleWrite: write() returns " << len);
fd_bytes(fd, len, FD_WRITE);
statCounter.syscalls.sock.writes++;
debugs(50, 5, HERE << "FD " << new_socket << " is a new socket");
assert(!isOpen(new_socket));
- fd_open(new_socket, FD_SOCKET, NULL);
+ fd_open(new_socket, FD_MSGHDR, NULL);
fdd_table[new_socket].close_file = NULL;
fdd_table[new_socket].close_line = 0;
- fd_table[new_socket].unix_addr = *addr;
-
fd_table[new_socket].sock_family = AI.ai_family;
if (!(flags & COMM_NOCLOEXEC))
FD_FILE,
FD_SOCKET,
FD_PIPE,
+ FD_MSGHDR,
FD_UNKNOWN
};
int socket_write_method(int, const char *, int);
int file_read_method(int, char *, int);
int file_write_method(int, const char *, int);
+#else
+int msghdr_read_method(int, char *, int);
+int msghdr_write_method(int, const char *, int);
#endif
const char *fdTypeStr[] = {
"File",
"Socket",
"Pipe",
+ "MsgHdr",
"Unknown"
};
return i;
}
+int
+msghdr_read_method(int fd, char *buf, int len)
+{
+ PROF_start(read);
+ const int i = recvmsg(fd, reinterpret_cast<msghdr*>(buf), MSG_DONTWAIT);
+ PROF_stop(read);
+ return i;
+}
+
+int
+msghdr_write_method(int fd, const char *buf, int len)
+{
+ PROF_start(write);
+ const int i = sendmsg(fd, reinterpret_cast<const msghdr*>(buf), MSG_NOSIGNAL);
+ PROF_stop(write);
+ return i > 0 ? len : i; // len is imprecise but the caller expects a match
+}
+
#endif
void
}
#else
- F->read_method = &default_read_method;
+ switch (type) {
- F->write_method = &default_write_method;
+ case FD_MSGHDR:
+ F->read_method = &msghdr_read_method;
+ F->write_method = &msghdr_write_method;
+ break;
+
+ default:
+ F->read_method = &default_read_method;
+ F->write_method = &default_write_method;
+ break;
+ }
#endif
int sock_family;
char ipaddr[MAX_IPSTRLEN]; /* dotted decimal address of peer */
char desc[FD_DESC_SZ];
- struct sockaddr_un unix_addr; ///< unix domain socket (UDS) address, if any
struct {
unsigned int open:1;
Port::start();
}
-Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId)
+Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
{
- for (Vector<StrandData>::iterator iter = strands.begin(); iter != strands.end(); ++iter) {
+ typedef Strands::iterator SI;
+ for (SI iter = strands.begin(); iter != strands.end(); ++iter) {
if (iter->kidId == kidId)
return &(*iter);
}
return NULL;
}
-void Ipc::Coordinator::registerStrand(const StrandData& strand)
+void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
{
- if (StrandData* found = findStrand(strand.kidId))
+ if (StrandCoord* found = findStrand(strand.kidId))
*found = strand;
else
strands.push_back(strand);
}
-void Ipc::Coordinator::receive(const Message& message)
+void Ipc::Coordinator::receive(const TypedMsgHdr& message)
{
switch (message.type()) {
case mtRegistration:
debugs(54, 6, HERE << "Registration request");
- handleRegistrationRequest(message.strand());
+ handleRegistrationRequest(StrandCoord(message));
break;
default:
}
}
-void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand)
+void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
{
registerStrand(strand);
// send back an acknowledgement; TODO: remove as not needed?
- SendMessage(MakeAddr(strandAddrPfx, strand.kidId),
- Message(mtRegistration, strand.kidId, strand.pid));
+ TypedMsgHdr message;
+ strand.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
}
void Ipc::Coordinator::broadcastSignal(int sig) const
{
- typedef Vector<StrandData>::const_iterator VSDCI;
- for (VSDCI iter = strands.begin(); iter != strands.end(); ++iter) {
+ typedef Strands::const_iterator SCI;
+ for (SCI iter = strands.begin(); iter != strands.end(); ++iter) {
debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
", PID=" << iter->pid);
kill(iter->pid, sig);
#include "Array.h"
#include "ipc/Port.h"
-
+#include "ipc/Messages.h"
namespace Ipc
{
protected:
virtual void start(); // Port (AsyncJob) API
- virtual void receive(const Message& message); // Port API
+ virtual void receive(const TypedMsgHdr& message); // Port API
- StrandData* findStrand(int kidId); ///< registered strand or NULL
- void registerStrand(const StrandData &); ///< adds or updates existing
- void handleRegistrationRequest(const StrandData &); ///< registers and ACKs
+ StrandCoord* findStrand(int kidId); ///< registered strand or NULL
+ void registerStrand(const StrandCoord &); ///< adds or updates existing
+ void handleRegistrationRequest(const StrandCoord &); ///< register,ACK
private:
- Vector<StrandData> strands; ///< registered processes and threads
+ typedef Vector<StrandCoord> Strands; ///< unsorted strands
+ Strands strands; ///< registered processes and threads
static Coordinator* TheInstance; ///< the only class instance in existence
CBDATA_CLASS2(Coordinator);
Kid.h \
Kids.cc \
Kids.h \
+ Messages.cc \
+ Messages.h \
+ TypedMsgHdr.cc \
+ TypedMsgHdr.h \
Coordinator.cc \
Coordinator.h \
UdsOp.cc \
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "comm.h"
+#include "ipc/Messages.h"
+#include "ipc/TypedMsgHdr.h"
+
+
+Ipc::StrandCoord::StrandCoord(): kidId(-1), pid(0)
+{
+}
+
+Ipc::StrandCoord::StrandCoord(int aKidId, pid_t aPid): kidId(aKidId), pid(aPid)
+{
+}
+
+Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0)
+{
+ hdrMsg.getData(mtRegistration, this, sizeof(this));
+}
+
+void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const
+{
+ hdrMsg.putData(mtRegistration, this, sizeof(this));
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_MESSAGES_H
+#define SQUID_IPC_MESSAGES_H
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+/// Declare IPC messages. These classes translate between high-level
+/// information and low-level TypedMsgHdr (i.e., struct msghdr) buffers.
+
+namespace Ipc
+{
+
+class TypedMsgHdr;
+
+typedef enum { mtNone = 0, mtRegistration, mtDescriptor } MessageType;
+
+/// Strand location details
+class StrandCoord {
+public:
+ StrandCoord(); ///< unknown location
+ StrandCoord(int akidId, pid_t aPid); ///< from registrant
+ explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+ int kidId; ///< internal Squid process number
+ pid_t pid; ///< OS process or thread identifier
+};
+
+/// a [socket] descriptor information
+class Descriptor
+{
+public:
+ explicit Descriptor(int fd); ///< from descriptor sender
+ explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+ int fd; ///< raw descriptor value
+};
+
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_MESSAGES_H */
#include "config.h"
+#include "CommCalls.h"
#include "ipc/Port.h"
const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc";
UdsOp(aListenAddr)
{
setOptions(COMM_NONBLOCKING | COMM_DOBIND);
+ buf.allocate();
}
void Ipc::Port::start()
" [" << this << ']');
if (params.flag == COMM_OK) {
assert(params.buf == buf.raw());
- assert(params.size == buf.size());
receive(buf);
}
// TODO: if there was a fatal error on our socket, close the socket before
- // trying to listen again and print a level-1 error message.
+ // trying to listen again and print a level-1 error message.
listen();
}
void listen();
/// handle IPC message just read
- virtual void receive(const Message& message) = 0;
+ virtual void receive(const TypedMsgHdr& message) = 0;
private:
void noteRead(const CommIoCbParams ¶ms); // Comm callback API
private:
- Message buf; ///< UDS read buffer filled by Comm
+ TypedMsgHdr buf; ///< msghdr struct filled by Comm
};
#include "config.h"
#include "ipc/Strand.h"
+#include "ipc/Messages.h"
#include "ipc/Kids.h"
{
debugs(54, 6, HERE);
Must(!isRegistered);
- SendMessage(coordinatorAddr,
- Message(mtRegistration, KidIdentifier, getpid()));
+ TypedMsgHdr message;
+ StrandCoord(KidIdentifier, getpid()).pack(message);
+ SendMessage(coordinatorAddr, message);
setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
}
-void Ipc::Strand::receive(const Message& message)
+void Ipc::Strand::receive(const TypedMsgHdr &message)
{
debugs(54, 6, HERE);
switch (message.type()) {
case mtRegistration:
- handleRegistrationResponse(message.strand());
+ handleRegistrationResponse(StrandCoord(message));
break;
default:
}
}
-void Ipc::Strand::handleRegistrationResponse(const StrandData& strand)
+void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
{
// handle registration response from the coordinator; it could be stale
if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
namespace Ipc
{
+class StrandCoord;
/// Receives coordination messages on behalf of its process or thread
class Strand: public Port
protected:
virtual void timedout(); // Port (UsdOp) API
- virtual void receive(const Message& message); // Port API
+ virtual void receive(const TypedMsgHdr &message); // Port API
private:
void registerSelf(); /// let Coordinator know this strand exists
- void handleRegistrationResponse(const StrandData& strand);
+ void handleRegistrationResponse(const StrandCoord &strand);
private:
bool isRegistered; ///< whether Coordinator ACKed registration (unused)
#include "config.h"
#include "comm.h"
+#include "CommCalls.h"
#include "ipc/UdsOp.h"
-Ipc::Message::Message()
-{
- data.messageType = mtNone;
- data.strand.kidId = -1;
-}
-
-Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid)
-{
- data.messageType = messageType;
- data.strand.kidId = kidId;
- data.strand.pid = pid;
-}
-
-const Ipc::StrandData &Ipc::Message::strand() const
-{
- Must(data.messageType == mtRegistration);
- return data.strand;
-}
-
Ipc::UdsOp::UdsOp(const String& pathAddr):
AsyncJob("Ipc::UdsOp"),
- addr(setAddr(pathAddr)),
+ address(PathToAddress(pathAddr)),
options(COMM_NONBLOCKING),
fd_(-1)
{
{
if (fd_ < 0) {
if (options & COMM_DOBIND)
- unlink(addr.sun_path);
- fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options);
+ unlink(address.sun_path);
+ fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options);
Must(fd_ >= 0);
}
return fd_;
}
-struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr)
-{
- assert(pathAddr.size() != 0);
- struct sockaddr_un unixAddr;
- memset(&unixAddr, 0, sizeof(unixAddr));
- unixAddr.sun_family = AF_LOCAL;
- xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
- return unixAddr;
-}
-
void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
{
AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
}
+struct sockaddr_un
+Ipc::PathToAddress(const String& pathAddr)
+{
+ assert(pathAddr.size() != 0);
+ struct sockaddr_un unixAddr;
+ memset(&unixAddr, 0, sizeof(unixAddr));
+ unixAddr.sun_family = AF_LOCAL;
+ xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
+ return unixAddr;
+}
+
+
CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
-Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage):
+Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage):
UdsOp(pathAddr),
message(aMessage),
- retries(4), // TODO: make configurable?
- timeout(5), // TODO: make configurable?
+ retries(10), // TODO: make configurable?
+ timeout(10), // TODO: make configurable?
writing(false)
{
+ message.address(address);
}
void Ipc::UdsSender::start()
{
debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
writing = false;
- if (params.flag != COMM_OK && retries-- > 0)
+ if (params.flag != COMM_OK && retries-- > 0) {
+ sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
write(); // XXX: should we close on error so that fd() reopens?
+ }
}
void Ipc::UdsSender::timedout()
}
-void Ipc::SendMessage(const String& toAddress, const Message& message)
+void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message)
{
AsyncJob::AsyncStart(new UdsSender(toAddress, message));
}
#include "SquidString.h"
-#include "CommCalls.h"
+#include "base/AsyncJob.h"
+#include "ipc/TypedMsgHdr.h"
+class CommTimeoutCbParams;
+class CommIoCbParams;
namespace Ipc
{
-
-typedef enum { mtNone = 0, mtRegistration } MessageType;
-
-/// Strand registration information
-struct StrandData
-{
- int kidId;
- pid_t pid;
-};
-
-/// information sent or received during IPC
-class Message
-{
-public:
- Message();
- Message(MessageType messageType, int kidId, pid_t pid);
-
- /// raw, type-independent access
- int type() const { return data.messageType; }
- char *raw() { return reinterpret_cast<char*>(&data); }
- const char *raw() const { return reinterpret_cast<const char*>(&data); }
- size_t size() const { return sizeof(data); }
-
- /// type-dependent access
- const StrandData& strand() const;
-
-private:
- struct Data {
- int messageType;
- StrandData strand;
- // TODO: redesign to better handle many type-specific datas like strand
- } data; ///< everything being sent or received
-};
-
/// code shared by unix-domain socket senders (e.g., UdsSender or Coordinator)
/// and receivers (e.g. Port or Coordinator)
class UdsOp: public AsyncJob
UdsOp(const String &pathAddr);
virtual ~UdsOp();
+public:
+ struct sockaddr_un address; ///< UDS address from path; treat as read-only
+
protected:
virtual void timedout() {} ///< called after setTimeout() if timed out
/// Comm timeout callback; calls timedout()
void noteTimeout(const CommTimeoutCbParams &p);
- /// configures addr member
- struct sockaddr_un setAddr(const String &pathAddr);
-
private:
- struct sockaddr_un addr; ///< UDS address
int options; ///< UDS options
int fd_; ///< UDS descriptor
UdsOp &operator= (const UdsOp &); // not implemented
};
+/// converts human-readable filename path into UDS address
+extern struct sockaddr_un PathToAddress(const String &pathAddr);
+
+
+
// XXX: move UdsSender code to UdsSender.{cc,h}
/// attempts to send an IPC message a few times, with a timeout
class UdsSender: public UdsOp
{
public:
- UdsSender(const String& pathAddr, const Message& aMessage);
+ UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage);
protected:
virtual void start(); // UdsOp (AsyncJob) API
void wrote(const CommIoCbParams& params); ///< done writing or error
private:
- Message message; ///< what to send
+ TypedMsgHdr message; ///< what to send
int retries; ///< how many times to try after a write error
int timeout; ///< total time to send the message
bool writing; ///< whether Comm started and did not finish writing
};
-void SendMessage(const String& toAddress, const Message& message);
+void SendMessage(const String& toAddress, const TypedMsgHdr& message);
}