]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Switched from sendto/recvfrom to sendmsg/recvmsg for UDS I/O. Replaced
authorAlex Rousskov <rousskov@measurement-factory.com>
Thu, 29 Apr 2010 20:12:03 +0000 (14:12 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Thu, 29 Apr 2010 20:12:03 +0000 (14:12 -0600)
inlined sendto/recvfrom hacks with FD_WRITE/READ_METHOD-based code.

A common msghdr-based interface allows us to use the same API for regular
IPC messages and for future messages that pass socket descriptors. While
msghdr allows for complex vector-based scatter/gather I/O, the IPC code
limits complexity by using a single-element I/O vector and a control message
part dedicated to passing descriptors.

Added a temporary hack to block-sleep between IPC message sending attempts so
that we do not use up all the allowed attempts in a short period of time. The
hack will be replace with a non-blocking addEvent-based sleep.

15 files changed:
src/comm.cc
src/enums.h
src/fd.cc
src/fde.h
src/ipc/Coordinator.cc
src/ipc/Coordinator.h
src/ipc/Makefile.am
src/ipc/Messages.cc [new file with mode: 0644]
src/ipc/Messages.h [new file with mode: 0644]
src/ipc/Port.cc
src/ipc/Port.h
src/ipc/Strand.cc
src/ipc/Strand.h
src/ipc/UdsOp.cc
src/ipc/UdsOp.h

index f2dd9881529f7d7e28e66cbed4d9d94aeac18271..8ce4c9a8e922d35a3687cc700f6adc1d5ec8e195 100644 (file)
@@ -1883,11 +1883,7 @@ commHandleWrite(int fd, void *data)
            (long int) state->offset << ", sz " << (long int) state->size << ".");
 
     nleft = state->size - state->offset;
-    // XXX: tmp hack: call sendto() for unconnected UDS sockets
-    if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL)
-        len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr));
-    else
-        len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
     debugs(5, 5, "commHandleWrite: write() returns " << len);
     fd_bytes(fd, len, FD_WRITE);
     statCounter.syscalls.sock.writes++;
@@ -2464,14 +2460,12 @@ comm_open_uds(int sock_type,
     debugs(50, 5, HERE << "FD " << new_socket << " is a new socket");
 
     assert(!isOpen(new_socket));
-    fd_open(new_socket, FD_SOCKET, NULL);
+    fd_open(new_socket, FD_MSGHDR, NULL);
 
     fdd_table[new_socket].close_file = NULL;
 
     fdd_table[new_socket].close_line = 0;
 
-    fd_table[new_socket].unix_addr = *addr;
-
     fd_table[new_socket].sock_family = AI.ai_family;
 
     if (!(flags & COMM_NOCLOEXEC))
index d7aa6f8b0ac4ee25438ca96f0cec66e142a737fb..dcdf542d5b2181bb6fda4c1fd79969abe6a15352 100644 (file)
@@ -69,6 +69,7 @@ enum fd_type {
     FD_FILE,
     FD_SOCKET,
     FD_PIPE,
+    FD_MSGHDR,
     FD_UNKNOWN
 };
 
index 144d36b720d86bfe94e985a0bdcf3d7669efb0b4..a22b38b7cdd2af5343bdcdc4e2ac6388d633edec 100644 (file)
--- a/src/fd.cc
+++ b/src/fd.cc
@@ -45,6 +45,9 @@ int socket_read_method(int, char *, int);
 int socket_write_method(int, const char *, int);
 int file_read_method(int, char *, int);
 int file_write_method(int, const char *, int);
+#else
+int msghdr_read_method(int, char *, int);
+int msghdr_write_method(int, const char *, int);
 #endif
 
 const char *fdTypeStr[] = {
@@ -53,6 +56,7 @@ const char *fdTypeStr[] = {
     "File",
     "Socket",
     "Pipe",
+    "MsgHdr",
     "Unknown"
 };
 
@@ -169,6 +173,24 @@ default_write_method(int fd, const char *buf, int len)
     return i;
 }
 
+int
+msghdr_read_method(int fd, char *buf, int len)
+{
+    PROF_start(read);
+    const int i = recvmsg(fd, reinterpret_cast<msghdr*>(buf), MSG_DONTWAIT);
+    PROF_stop(read);
+    return i;
+}
+
+int
+msghdr_write_method(int fd, const char *buf, int len)
+{
+    PROF_start(write);
+    const int i = sendmsg(fd, reinterpret_cast<const msghdr*>(buf), MSG_NOSIGNAL);
+    PROF_stop(write);
+    return i > 0 ? len : i; // len is imprecise but the caller expects a match
+}
+
 #endif
 
 void
@@ -213,9 +235,18 @@ fd_open(int fd, unsigned int type, const char *desc)
     }
 
 #else
-    F->read_method = &default_read_method;
+    switch (type) {
 
-    F->write_method = &default_write_method;
+    case FD_MSGHDR:
+        F->read_method = &msghdr_read_method;
+        F->write_method = &msghdr_write_method;
+        break;
+
+    default:
+        F->read_method = &default_read_method;
+        F->write_method = &default_write_method;
+        break;
+    }
 
 #endif
 
index 6a7365c0410e03025dba2046375aa748f0503b3b..a0b903d1a3d43ebf4c2c9cc1566575757c311ba1 100644 (file)
--- a/src/fde.h
+++ b/src/fde.h
@@ -60,7 +60,6 @@ public:
     int sock_family;
     char ipaddr[MAX_IPSTRLEN];            /* dotted decimal address of peer */
     char desc[FD_DESC_SZ];
-    struct sockaddr_un unix_addr; ///< unix domain socket (UDS) address, if any
 
     struct {
         unsigned int open:1;
index 6976b1c5ab03f1a6aab6b1fba2c4a0b22310aa5f..02f91a70c378249a17173a41b09b44ec65d6083c 100644 (file)
@@ -24,29 +24,30 @@ void Ipc::Coordinator::start()
     Port::start();
 }
 
-Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId)
+Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
 {
-    for (Vector<StrandData>::iterator iter = strands.begin(); iter != strands.end(); ++iter) {
+    typedef Strands::iterator SI;
+    for (SI iter = strands.begin(); iter != strands.end(); ++iter) {
         if (iter->kidId == kidId)
             return &(*iter);
     }
     return NULL;
 }
 
-void Ipc::Coordinator::registerStrand(const StrandData& strand)
+void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
 {
-    if (StrandData* found = findStrand(strand.kidId))
+    if (StrandCoord* found = findStrand(strand.kidId))
         *found = strand;
     else
         strands.push_back(strand);
 }
 
-void Ipc::Coordinator::receive(const Message& message)
+void Ipc::Coordinator::receive(const TypedMsgHdr& message)
 {
     switch (message.type()) {
     case mtRegistration:
         debugs(54, 6, HERE << "Registration request");
-        handleRegistrationRequest(message.strand());
+        handleRegistrationRequest(StrandCoord(message));
         break;
 
     default:
@@ -55,19 +56,20 @@ void Ipc::Coordinator::receive(const Message& message)
     }
 }
 
-void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand)
+void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
 {
     registerStrand(strand);
 
     // send back an acknowledgement; TODO: remove as not needed?
-    SendMessage(MakeAddr(strandAddrPfx, strand.kidId),
-        Message(mtRegistration, strand.kidId, strand.pid));
+    TypedMsgHdr message;
+    strand.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
 }
 
 void Ipc::Coordinator::broadcastSignal(int sig) const
 {
-    typedef Vector<StrandData>::const_iterator VSDCI;
-    for (VSDCI iter = strands.begin(); iter != strands.end(); ++iter) {
+    typedef Strands::const_iterator SCI;
+    for (SCI iter = strands.begin(); iter != strands.end(); ++iter) {
         debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
             ", PID=" << iter->pid);
         kill(iter->pid, sig);
index f2fd75b71a737ad187f2f979dfb25571ac15b635..1d81a4ed7ea2c37c434d83c3ecd3a548ab089014 100644 (file)
@@ -11,7 +11,7 @@
 
 #include "Array.h"
 #include "ipc/Port.h"
-
+#include "ipc/Messages.h"
 
 namespace Ipc
 {
@@ -29,14 +29,15 @@ public:
 
 protected:
     virtual void start(); // Port (AsyncJob) API
-    virtual void receive(const Message& message); // Port API
+    virtual void receive(const TypedMsgHdr& message); // Port API
 
-    StrandData* findStrand(int kidId); ///< registered strand or NULL
-    void registerStrand(const StrandData &); ///< adds or updates existing
-    void handleRegistrationRequest(const StrandData &); ///< registers and ACKs
+    StrandCoord* findStrand(int kidId); ///< registered strand or NULL
+    void registerStrand(const StrandCoord &); ///< adds or updates existing
+    void handleRegistrationRequest(const StrandCoord &); ///< register,ACK
 
 private:
-    Vector<StrandData> strands; ///< registered processes and threads
+    typedef Vector<StrandCoord> Strands; ///< unsorted strands
+    Strands strands; ///< registered processes and threads
     static Coordinator* TheInstance; ///< the only class instance in existence
 
     CBDATA_CLASS2(Coordinator);
index 6e18342cfcf07ea622d6e4196b0e4cd518c51373..3c901415c9f1dcd54f612cfb83aeac7a31a44f89 100644 (file)
@@ -8,6 +8,10 @@ libipc_la_SOURCES = \
        Kid.h \
        Kids.cc \
        Kids.h \
+       Messages.cc \
+       Messages.h \
+       TypedMsgHdr.cc \
+       TypedMsgHdr.h \
        Coordinator.cc \
        Coordinator.h \
        UdsOp.cc \
diff --git a/src/ipc/Messages.cc b/src/ipc/Messages.cc
new file mode 100644 (file)
index 0000000..7060a94
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "comm.h"
+#include "ipc/Messages.h"
+#include "ipc/TypedMsgHdr.h"
+
+
+Ipc::StrandCoord::StrandCoord(): kidId(-1), pid(0)
+{
+}
+
+Ipc::StrandCoord::StrandCoord(int aKidId, pid_t aPid): kidId(aKidId), pid(aPid)
+{
+}
+
+Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0)
+{
+       hdrMsg.getData(mtRegistration, this, sizeof(this));
+}
+
+void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const
+{
+    hdrMsg.putData(mtRegistration, this, sizeof(this));
+}
diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h
new file mode 100644 (file)
index 0000000..d7b1aae
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_MESSAGES_H
+#define SQUID_IPC_MESSAGES_H
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+/// Declare IPC messages. These classes translate between high-level
+/// information and low-level TypedMsgHdr (i.e., struct msghdr) buffers.
+
+namespace Ipc
+{
+
+class TypedMsgHdr;
+
+typedef enum { mtNone = 0, mtRegistration, mtDescriptor } MessageType;
+
+/// Strand location details
+class StrandCoord {
+public:
+       StrandCoord(); ///< unknown location
+       StrandCoord(int akidId, pid_t aPid); ///< from registrant
+       explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+       void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+    int kidId; ///< internal Squid process number
+    pid_t pid; ///< OS process or thread identifier
+};
+
+/// a [socket] descriptor information
+class Descriptor
+{
+public:
+    explicit Descriptor(int fd); ///< from descriptor sender
+       explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+       void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+    int fd; ///< raw descriptor value
+};
+
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_MESSAGES_H */
index 5a01a700e619bef3b0914088dbcdea157029f8de..8d525005cbe66ab02cf39bc83cad98c9efa2478f 100644 (file)
@@ -7,6 +7,7 @@
 
 
 #include "config.h"
+#include "CommCalls.h"
 #include "ipc/Port.h"
 
 const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc";
@@ -17,6 +18,7 @@ Ipc::Port::Port(const String& aListenAddr):
     UdsOp(aListenAddr)
 {
     setOptions(COMM_NONBLOCKING | COMM_DOBIND);
+    buf.allocate();
 }
 
 void Ipc::Port::start()
@@ -54,11 +56,10 @@ void Ipc::Port::noteRead(const CommIoCbParams& params)
         " [" << this << ']');
     if (params.flag == COMM_OK) {
         assert(params.buf == buf.raw());
-        assert(params.size == buf.size());
         receive(buf);
     }
     // TODO: if there was a fatal error on our socket, close the socket before
-       // trying to listen again and print a level-1 error message.
+    // trying to listen again and print a level-1 error message.
 
     listen();
 }
index 8c87280cc23ca2f891f581c60bd0d316baec0bf4..ffe791ad3c4b2be70ecd4dc7cf9f93cd08da84db 100644 (file)
@@ -34,13 +34,13 @@ protected:
     void listen();
 
     /// handle IPC message just read
-    virtual void receive(const Message& message) = 0;
+    virtual void receive(const TypedMsgHdr& message) = 0;
 
 private:
     void noteRead(const CommIoCbParams &params); // Comm callback API
 
 private:
-    Message buf; ///< UDS read buffer filled by Comm
+    TypedMsgHdr buf; ///< msghdr struct filled by Comm
 };
 
 
index a220d3bf5ea019379ded3c9709362303ab1a21af..0c8f59ebced15cf2e67354ec7084ce513fe74ab8 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "config.h"
 #include "ipc/Strand.h"
+#include "ipc/Messages.h"
 #include "ipc/Kids.h"
 
 
@@ -29,18 +30,19 @@ void Ipc::Strand::registerSelf()
 {
     debugs(54, 6, HERE);
     Must(!isRegistered);
-    SendMessage(coordinatorAddr,
-               Message(mtRegistration, KidIdentifier, getpid()));
+    TypedMsgHdr message;
+    StrandCoord(KidIdentifier, getpid()).pack(message);
+    SendMessage(coordinatorAddr, message);
     setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
 }
 
-void Ipc::Strand::receive(const Message& message)
+void Ipc::Strand::receive(const TypedMsgHdr &message)
 {
     debugs(54, 6, HERE);
     switch (message.type()) {
 
     case mtRegistration:
-        handleRegistrationResponse(message.strand());
+        handleRegistrationResponse(StrandCoord(message));
         break;
 
     default:
@@ -49,7 +51,7 @@ void Ipc::Strand::receive(const Message& message)
     }
 }
 
-void Ipc::Strand::handleRegistrationResponse(const StrandData& strand)
+void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
 {
     // handle registration response from the coordinator; it could be stale
     if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
index da207d005cc32e8cc083790b162d958c2f291653..5a1e26e7b47319f301d65ebb3dae7ec5d4383bcd 100644 (file)
@@ -14,6 +14,7 @@
 namespace Ipc
 {
 
+class StrandCoord;
 
 /// Receives coordination messages on behalf of its process or thread
 class Strand: public Port
@@ -25,11 +26,11 @@ public:
 
 protected:
     virtual void timedout(); // Port (UsdOp) API
-    virtual void receive(const Message& message); // Port API
+    virtual void receive(const TypedMsgHdr &message); // Port API
 
 private:
     void registerSelf(); /// let Coordinator know this strand exists
-    void handleRegistrationResponse(const StrandData& strand);
+    void handleRegistrationResponse(const StrandCoord &strand);
 
 private:
     bool isRegistered; ///< whether Coordinator ACKed registration (unused)
index 5f7f7703c367a912631787ed1bae46c5b29b9711..9f6c005678fca684ddd8d2655f133843b22cda0d 100644 (file)
@@ -8,31 +8,13 @@
 
 #include "config.h"
 #include "comm.h"
+#include "CommCalls.h"
 #include "ipc/UdsOp.h"
 
 
-Ipc::Message::Message()
-{
-    data.messageType = mtNone;
-    data.strand.kidId = -1;
-}
-
-Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid)
-{
-    data.messageType = messageType;
-    data.strand.kidId = kidId;
-    data.strand.pid = pid;
-}
-
-const Ipc::StrandData &Ipc::Message::strand() const
-{
-    Must(data.messageType == mtRegistration);
-       return data.strand;
-}
-
 Ipc::UdsOp::UdsOp(const String& pathAddr):
     AsyncJob("Ipc::UdsOp"),
-    addr(setAddr(pathAddr)),
+    address(PathToAddress(pathAddr)),
     options(COMM_NONBLOCKING),
     fd_(-1)
 {
@@ -55,23 +37,13 @@ int Ipc::UdsOp::fd()
 {
     if (fd_ < 0) {
         if (options & COMM_DOBIND)
-            unlink(addr.sun_path);
-        fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options);
+            unlink(address.sun_path);
+        fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options);
         Must(fd_ >= 0);
     }
     return fd_;
 }
 
-struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr)
-{
-    assert(pathAddr.size() != 0);
-    struct sockaddr_un unixAddr;
-    memset(&unixAddr, 0, sizeof(unixAddr));
-    unixAddr.sun_family = AF_LOCAL;
-    xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
-    return unixAddr;
-}
-
 void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
 {
     AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
@@ -91,15 +63,28 @@ void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
 }
 
 
+struct sockaddr_un
+Ipc::PathToAddress(const String& pathAddr)
+{
+    assert(pathAddr.size() != 0);
+    struct sockaddr_un unixAddr;
+    memset(&unixAddr, 0, sizeof(unixAddr));
+    unixAddr.sun_family = AF_LOCAL;
+    xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
+    return unixAddr;
+}
+
+
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
 
-Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage):
+Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage):
     UdsOp(pathAddr),
     message(aMessage),
-    retries(4), // TODO: make configurable?
-    timeout(5), // TODO: make configurable?
+    retries(10), // TODO: make configurable?
+    timeout(10), // TODO: make configurable?
     writing(false)
 {
+    message.address(address);
 }
 
 void Ipc::UdsSender::start()
@@ -128,8 +113,10 @@ void Ipc::UdsSender::wrote(const CommIoCbParams& params)
 {
     debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
     writing = false;
-    if (params.flag != COMM_OK && retries-- > 0)
+    if (params.flag != COMM_OK && retries-- > 0) {
+        sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
         write(); // XXX: should we close on error so that fd() reopens?
+    }
 }
 
 void Ipc::UdsSender::timedout()
@@ -139,7 +126,7 @@ void Ipc::UdsSender::timedout()
 }
 
 
-void Ipc::SendMessage(const String& toAddress, const Message& message)
+void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message)
 {
     AsyncJob::AsyncStart(new UdsSender(toAddress, message));
 }
index 7801f25b98f15bde3ae166785036a163ed7a8d46..32b58da232319f748d6de1ba17ee2425e6792fee 100644 (file)
 
 
 #include "SquidString.h"
-#include "CommCalls.h"
+#include "base/AsyncJob.h"
+#include "ipc/TypedMsgHdr.h"
 
+class CommTimeoutCbParams;
+class CommIoCbParams;
 
 namespace Ipc
 {
 
-
-typedef enum { mtNone = 0, mtRegistration } MessageType;
-
-/// Strand registration information
-struct StrandData
-{
-    int kidId;
-    pid_t pid;
-};
-
-/// information sent or received during IPC
-class Message
-{
-public:
-    Message();
-    Message(MessageType messageType, int kidId, pid_t pid);
-
-    /// raw, type-independent access
-    int type() const { return data.messageType; }
-    char *raw() { return reinterpret_cast<char*>(&data); }
-    const char *raw() const { return reinterpret_cast<const char*>(&data); }
-    size_t size() const { return sizeof(data); }
-
-    /// type-dependent access
-    const StrandData& strand() const;
-
-private:
-    struct Data {
-        int messageType;
-        StrandData strand;
-        // TODO: redesign to better handle many type-specific datas like strand
-    } data; ///< everything being sent or received
-};
-
 /// code shared by unix-domain socket senders (e.g., UdsSender or Coordinator)
 /// and receivers (e.g. Port or Coordinator)
 class UdsOp: public AsyncJob
@@ -58,6 +27,9 @@ public:
     UdsOp(const String &pathAddr);
     virtual ~UdsOp();
 
+public:
+    struct sockaddr_un address; ///< UDS address from path; treat as read-only
+
 protected:
     virtual void timedout() {} ///< called after setTimeout() if timed out
 
@@ -73,11 +45,7 @@ private:
     /// Comm timeout callback; calls timedout()
     void noteTimeout(const CommTimeoutCbParams &p);
 
-    /// configures addr member
-    struct sockaddr_un setAddr(const String &pathAddr);
-
 private:
-    struct sockaddr_un addr; ///< UDS address
     int options; ///< UDS options
     int fd_; ///< UDS descriptor
 
@@ -86,12 +54,17 @@ private:
     UdsOp &operator= (const UdsOp &); // not implemented
 };
 
+/// converts human-readable filename path into UDS address
+extern struct sockaddr_un PathToAddress(const String &pathAddr);
+
+
+
 // XXX: move UdsSender code to UdsSender.{cc,h}
 /// attempts to send an IPC message a few times, with a timeout
 class UdsSender: public UdsOp
 {
 public:
-    UdsSender(const String& pathAddr, const Message& aMessage);
+    UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage);
 
 protected:
     virtual void start(); // UdsOp (AsyncJob) API
@@ -103,7 +76,7 @@ private:
     void wrote(const CommIoCbParams& params); ///< done writing or error
 
 private:
-    Message message; ///< what to send
+    TypedMsgHdr message; ///< what to send
     int retries; ///< how many times to try after a write error
     int timeout; ///< total time to send the message
     bool writing; ///< whether Comm started and did not finish writing
@@ -116,7 +89,7 @@ private:
 };
 
 
-void SendMessage(const String& toAddress, const Message& message);
+void SendMessage(const String& toAddress, const TypedMsgHdr& message);
 
 
 }